PHP 对rabbitMQ的详细使用讲解测试(TP5) 版本三 之 路由篇
发布日期:2021-06-30 15:37:11 浏览次数:2 分类:技术文章

本文共 4555 字,大约阅读时间需要 15 分钟。

rabbitMQ 下载官网 https://www.rabbitmq.com/

PHP链接MQ扩展地址 https://github.com/php-amqplib/php-amqplib

或者使用  composer  安装

 composer require php-amqplib/php-amqplib

 

channel->exchange_declare('log_queue', 'fanout', false, false, false); //这里我们创建一个直连交换机 $this->channel->exchange_declare('direct_logs', 'direct', false, false, false); // 查看所有交换机的列表 // 命令: rabbitmqctl list_exchanges // 这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们 //匿名交换机 //在调用上面写的 四个方法的时候 并没有对交换机进行配置 但仍然可以使用消息发送 // 因为我们使用了命名为空的 字符串默认了交换机 第二个参数就是交换机的名称 //$this->channel->basic_publish($msg, '', 'hello'); // 我们这里使用的默认或者匿名交换机 消息将会根据指定的routing_key 发到指定的队列 // routing key是basic_publish函数的第三个参数 第二个参数为交换机的名字 //另外 我们需要把我们的消息也要设置持久化 设置为 delivery_mode = 2 $msg=new AMQPMessage($data,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); //现在,我们就可以发送消息到一个我们命名的交换机: //我们命名了 交换机 第三个参数就不需要再找寻队列了 只限于扇类型的交换机 // $this->channel->basic_publish($msg, 'log_queue'); //现在我们使用直连交换机 要设置第三个参数 //我们先假设“severity”的值是info、warning、error中的一个。 // 这样我们就可以根据 报错的等级不同 推送到路由对应不同的队列进行数据处理了 // 当然在接收者那边 也是需要做调整的 这个参数 根据传进来的值来定 $severity=''; $this->channel->basic_publish($msg, 'direct_logs',$severity); echo " [x] Sent '",$data,"'\n"; //临时队列 // 上面的四个方法中 声明了两个队列名 ( hello和task_queue) //给队列名称很重要 我们需要把工作者指定到正确的队列 //如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的 //这个方法里的程序 看起来跟上面的四个方法没什么区别 唯一的区别就是把消息发送到了交换机上而不是匿名交换机 // //发送完消息后 关闭通道 $this->channel->close(); $this->connection->close(); } //需要工作的接受者 public function log_work(){ echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; // 创建一个交换机 $this->channel->exchange_declare('log_queue', 'fanout', false, false, false); //1.当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。 //我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。 //2.当与消费者(consumer)断开连接的时候,这个队列应当被立即删除 list($queue_name, ,) = $this->channel->queue_declare("", false, false, true, false); // 方法返回时,$queue_name变量包含一个随机生成的RabbitMQ队列名称。例如,类似amq.gen-jzty20brgko-hjmujj0wlg。 //队列交换机绑定 只需工作者队列交换机绑定 发布任务的人 不需要绑定队列 只需要把消息推送的交换机中 只限于扇类型交换机 // $this->channel->queue_bind($queue_name, 'log_queue'); //根据传过来的类型 来确定消息到底推送到哪个队列中 // 这里的 $severities 真正开发中 是接收传进来的值 我这边直接写一个变量 $severities=[]; foreach($severities as $severity) { $this->channel->queue_bind($queue_name, 'direct_logs', $severity); } //绑定(binding)是指交换机(exchange)和队列(queue)的关系。 //可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣 //绑定的时候可以带上一个额外的参数 routing_key //为了避免与$channel::basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定 //做一个解释 // 把消息推送到交换机中或者直接推送到队列中 // 这个方法是在发送者中使用 //basic_publish() 方法 // 三个参数 // 1.发送的内容 2.交换机的名称(为空的话匿名交换机) 3. 路由的名称(routing_key) 找队列 // 如果你起了交换机名称 第三个参数可省略 发送者只需要把消息发送到交换机中 即可 // 交换机与队列的绑定 //这个方法是在接受者中使用 //queue_bind() 方法 // 三个参数 // 1.队列名称 2.交换机名称 3.路由(routing_key)防止跟上面的方法参数混淆 或者起名叫 绑定键(binding_key) //$binding_key = 'black'; //$channel->queue_bind($queue_name, $exchange_name, $binding_key); //绑定键的意义取决于交换机(exchange)的类型。我们之前使用过的扇型交换机(fanout exchanges)会忽略这个值。 //现在我们采用直连交换机类型 Direct exchange //我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。 //我们使用的扇型交换机(fanout exchange)没有足够的灵活性 —— 它能做的仅仅是广播 //我们将会使用直连交换机(direct exchange)来代替。 //路由的算法很简单 —— // 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列 // 这样就根据路由名称来 交换机把消息送到哪个队列中 //多个绑定 //多个队列使用相同的绑定建是合法的 // 一个绑定建可以链接多个队列 这样的话 直连交换机类型 就可以做到 扇形交换机的功能 把消息散播到绑定的队列中 //队列交换机绑定(binding)列表查询 // rabbitmqctl list_bindings //我们要创建一个回调函数来接收发送者发送的消息 //消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候设置的第四个参数basic_consume为false // (true 意味着不响应ack) ,当工作者(worker)完成了任务,就发送一个响应。 $this->channel->basic_consume($queue_name, '', false, false, false, false, 'msg'); //运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。 //当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。 //如果在basic_consume 方法第四个参赛为false 的话 再回调函数里一定要basic_ack // 只要通道注册了回调,就进行循环 while ($this->channel ->is_consuming()) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); }}

转载地址:https://jsonll.blog.csdn.net/article/details/105864256 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:java 数据类型详细讲解
下一篇:PHP 对rabbitMQ的详细使用讲解测试(TP5) 版本二

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月10日 04时39分07秒