PHP 对rabbitMQ的详细使用讲解测试(TP5) 版本一
发布日期:2021-06-30 15:37:09 浏览次数:2 分类:技术文章

本文共 9534 字,大约阅读时间需要 31 分钟。

PHP链接MQ扩展地址 

或者使用  composer  安装

composer require php-amqplib/php-amqplib

 

安装后 进行链接使用 

以下是例子说明

1 //  (2)queue(消息队列载体)持久化,在声明时指定durable => 1 //  (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化) //如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。 //如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 //启动 //在运行RabbitMQ时,打开两个终端, //这个方法来启动消费者 public function customer(){ //交换机名 消息交换机,它指定消息按什么规则,路由到哪个队列。 $exchange = 'change_1'; //队列名 消息队列载体,每个消息都会被投入到一个或多个队列。 $queue = 'queun_1'; //路由的key $routing="routing_1"; //消费者 就是接受消息的程序 $consumerTag = 'consumer_1'; //连接MQ $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '测试项目'); $channel = $connection->channel(); /* name: $queue 创建队列 passive: false 持久durable: true // //队列将在服务器重启后继续存在 互斥exclusive: false // 队列可以通过其他渠道访问 auto_delete: false 通道关闭后,队列不会被删除 */ $channel->queue_declare($queue, false, true, false, false); /* name: $exchange 创建交换机 type: direct passive: false durable: true 持久// 交换器将在服务器重启后继续存在 auto_delete: false //一旦通道关闭,交换器将不会被删除。 */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); //绑定交换机与队列,并指定路由键 $channel->queue_bind($queue, $exchange,$routing); /* queue: 从哪里获取消息的队列 consumer_tag: 消费者标识符 no_local: 不接收此使用者发布的消息 no_ack: 如果求设置为true,则此使用者将使用自动确认模式。详情请参见. See https://www.rabbitmq.com/confirms.html for details. exclusive:请独占使用者访问,这意味着只有这个使用者可以访问队列 nowait: callback: :PHP回调 */ $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); register_shutdown_function('shutdown', $channel, $connection); // 只要通道注册了回调,就进行循环 while ($channel ->is_consuming()) { $channel->wait(); } } //然后在另一个终端上做: public function other(){ //交换机名 $exchange = 'change_1'; //队列 // $queue = 'queun_1'; //路由的key $routing="routing_1"; //创建连接 $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '测试项目'); $channel = $connection->channel(); /* name: $queue passive: false 持久 durable: true // //队列将在服务器重启后继续存在 互斥 exclusive: false // 队列可以通过其他渠道访问 auto_delete: false 通道关闭后,队列不会被删除 */ // $channel->queue_declare($queue, false, true, false, false); /* name: $exchange 创建交换机 type: direct passive: false durable: true // 交换器将在服务器重启后继续存在 auto_delete: false //一旦通道关闭,交换器将不会被删除。 */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); // $channel->queue_bind($queue, $exchange); //发送消息 date_default_timezone_set("Asia/Shanghai"); //测试一种发送消息方式// $argv=['Json','Hello','My name is Jun'];// $messageBody = implode(' ', array_slice($argv, 0)); // $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));// $channel->basic_publish($message, $exchange); //测试另一种方式 for($i=0; $i<50; ++$i){ sleep(1);//休眠1秒 //消息内容 $messageBody = "Hello,Json Now Time:".date("h:i:sa"); $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, $exchange,$routing); echo "Send Message:". $i."\n"; } $channel->close(); $connection->close();// 您应该看到消息到达另一个终端上的进程// 然后停止消费者,发送给它的退出消息: } //如果您需要监听用于连接RabbitMQ的套接字,那么可以在非阻塞使用者中查看该示例。 public function example(){ $exchange = 'router'; $queue = 'msgs'; $consumerTag = 'consumer'; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '测试项目'); $channel = $connection->channel(); /* The following code is the same both in the consumer and the producer. In this way we are sure we always have a queue to consume from and an exchange where to publish messages. */ /* name: $queue passive: false durable: true // the queue will survive server restarts exclusive: false // the queue can be accessed in other channels auto_delete: false //the queue won't be deleted once the channel is closed. */ $channel->queue_declare($queue, false, true, false, false); /* name: $exchange type: direct passive: false durable: true // the exchange will survive server restarts auto_delete: false //the exchange won't be deleted once the channel is closed. */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange); /* queue: Queue from where to get the messages consumer_tag: Consumer identifier no_local: Don't receive messages published by this consumer. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details. exclusive: Request exclusive consumer access, meaning only this consumer can access the queue nowait: callback: A PHP Callback */ $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); register_shutdown_function('shutdown', $channel, $connection);// Loop as long as the channel has callbacks registered while ($channel->is_consuming()) { $channel->wait(null, true); // do something else usleep(300000); } } //amqp_ha_consumer.php:演示镜像队列的使用 //amqp_consumer_exclusive.php 和 amqp_publisher_exclusive.php 演示使用排他队列展开交换。 //amqp_consumer_fanout_ {1,2}.php和amqp_publisher_fanout.php 演示使用指定队列展开交换。 //basic_get.php 演示通过使用基本的get AMQP调用从队列中获取消息。 //队列的使用 public function queue(){ $exchange = 'router'; $queue = 'haqueue'; $specificQueue = 'specific-haqueue'; $consumerTag = 'consumer'; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '测试项目'); $channel = $connection->channel(); /* 下面的代码在消费者和生产者中都是相同的。 通过这种方式,我们可以确保始终有一个队列来使用from和an 交换在何处发布消息。 */ $haConnection = new AMQPTable(array('x-ha-policy' => 'all')); $haSpecificConnection = new AMQPTable(array( 'x-ha-policy' => 'nodes', 'x-ha-policy-params' => array( 'rabbit@127.0.0.1' , 'hare@127.0.0.1' ), )); /* name: $queue passive: false durable: true // 队列将在服务器重启后继续存在 exclusive: false // /队列可以通过其他渠道访问 auto_delete: false //通道关闭后,队列不会被删除。 nowait: false // 不会等待某些事情的回复 parameters: array // 如何向队列声明发送特定的额外数据 */ $channel->queue_declare($queue, false, false, false, false, false, $haConnection); $channel->queue_declare($specificQueue, false, false, false, false, false, $haSpecificConnection); /* name: $exchange type: direct 直接 passive: false durable: true // the exchange will survive server restarts /交换将在服务器重启后仍然有效 auto_delete: false //一旦通道关闭,交换器将不会被删除。 */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange); $channel->queue_bind($specificQueue, $exchange); /* queue: 从哪里获取消息的队列 consumer_tag:消费者标识符 no_local: 不接收此使用者发布的消息。 no_ack: 如果设置为true,则此使用者将使用自动确认模式。详情请参见. See https://www.rabbitmq.com/confirms.html. exclusive: 请求独占使用者访问,这意味着只有这个使用者可以访问队列 nowait: callback: A PHP Callback */ $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); register_shutdown_function('shutdown', $channel, $connection); // Loop as long as the channel has callbacks registered while ($channel->is_consuming()) { $channel->wait(); } }}

 

 

回调函数

// +----------------------------------------------------------------------// 应用公共文件/** * @param \PhpAmqpLib\Message\AMQPMessage $message */function process_message($message){ echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 发送一个带有“quit”字符串的消息来取消消费者。 if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); }}/** * @param \PhpAmqpLib\Channel\AMQPChannel $channel * @param \PhpAmqpLib\Connection\AbstractConnection $connection */function shutdown($channel, $connection){ $channel->close(); $connection->close();}

转载地址:https://jsonll.blog.csdn.net/article/details/105599507 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:PHP 对rabbitMQ的详细使用讲解测试(TP5) 版本二
下一篇:laravel框架 路由 中间件 交互

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月22日 21时34分52秒