php使用RabbitMQ
作者QQ:67065435 QQ群:821635552
本站内容全部为作者原创,转载请注明出处!
引入相关包^2.7
composer require php-amqplib/php-amqplib
命令行内实现
<?php #生产者 $msg = '这是要发送的消息'; $rabbitmq_host = '127.0.0.1'; $rabbitmq_port = '5672'; $rabbitmq_user = 'guest'; $rabbitmq_pswd = 'guest'; $rabbitmq_path = '/'; $rabbitmq_excg = 'test';//exchange $rabbitmq_queu = 'test';//服务队列 $exchange_type = 'direct'; $consumer_tag = ''; $passive = false; $durable = true; $exclusive = false; $auto_delete = false; $no_local = false; $no_ack = true; $nowait = false; $connection = new AMQPStreamConnection($rabbitmq_host, $rabbitmq_port, $rabbitmq_user, $rabbitmq_pswd, $rabbitmq_path); $channel = $connection->channel(); $channel->queue_declare($rabbitmq_queu, $passive, $durable, $exclusive, $auto_delete); $channel->exchange_declare($rabbitmq_excg, $exchange_type, $passive, $durable, $auto_delete); $channel->queue_bind($rabbitmq_queu, $rabbitmq_excg); $properties = [ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]; ///////////////////////////发送↓///////////////////////// $message = new AMQPMessage($msg, $properties); $channel->basic_publish($message, $rabbitmq_excg); ///////////////////////////发送↑///////////////////////// $channel->close(); $connection->close(); # 消费者 $rabbitmq_host = '127.0.0.1'; $rabbitmq_port = '5672'; $rabbitmq_user = 'guest'; $rabbitmq_pswd = 'guest'; $rabbitmq_path = '/'; $rabbitmq_excg = 'test';//exchange $rabbitmq_queu = 'test';//服务队列 $exchange_type = 'direct'; $consumer_tag = ''; $passive = false; $durable = true; $exclusive = false; $auto_delete = false; $no_local = false; $no_ack = true; $nowait = false; $connection = new AMQPStreamConnection($rabbitmq_host, $rabbitmq_port, $rabbitmq_user, $rabbitmq_pswd, $rabbitmq_path); $channel = $connection->channel(); $channel->queue_declare($rabbitmq_queu, $passive, $durable, $exclusive, $auto_delete); $channel->exchange_declare($rabbitmq_excg, $exchange_type, $passive, $durable, $auto_delete); $channel->queue_bind($rabbitmq_queu, $rabbitmq_excg); $channel->basic_consume($rabbitmq_queu, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, [$this, 'getMsg']); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ///////////////////////////接收↓///////////////////////// function getMsg($msg) { $msg = $msg->body; echo $msg . PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); if ($msg->body === 'quit') { $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); } } ///////////////////////////接收↑/////////////////////////