php-rabbitmq结合rabbitmq_delayed_message_exchange实现延时队列
发布日期:2021-06-29 14:32:39
浏览次数:3
分类:技术文章
本文共 4023 字,大约阅读时间需要 13 分钟。
#查看插件列表rabbitmq-plugins list#如果未安装,则下载wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip#解压unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip#移动到rabbitmq plugins文件夹中,本机的rabbitmq是docker容器,plugins文件夹位于/pluginsmv rabbitmq_delayed_message_exchange-20171201-3.7.x.ez ./plugins#启用rabbitmq_delayed_message_exchange插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange#查看插件启用情况rabbitmq-plugins list
重启rabbitmq
2,php-rabbitmq(自行composer安装)
parseQueueConfig($queue_name); $host = $queue_config['host']; $port = $queue_config['port']; $username = $queue_config['username']; $password = $queue_config['password']; //创建连接 $this->connection = new AMQPStreamConnection($host, $port, $username, $password); $this->channel = $this->connection->channel(); $this->channel->exchange_declare( $queue_name, //exchange类型为x-delayed-message 'x-delayed-message', false, true, false, false, false, //此处是重点,$argument必须使用new AMQPTable()生成 new AMQPTable([ "x-delayed-type" => 'direct' ]) ); //队列声明 $this->channel->queue_declare($queue_name, false, true, false, false); //队列与exchange绑定 $this->channel->queue_bind($queue_name, $queue_name, $queue_name); } //生成消息 private function createDelayMsg($data) { $this->msg = new AMQPMessage( $data, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //此处是重点,设置延时时间,单位是毫秒 1s=1000ms,实例延迟20s 'application_headers' => new AMQPTable([ 'x-delay' => 20000, ]) ] ); return $this->msg; } //生产者发送消息,代码中只需调用此方法发送消息即可 public function publishDelayMsg($queue_name, $msg_data) { $this->initDelay($queue_name); $msg = $this->createDelayMsg($msg_data); $this->channel->basic_publish($msg, $queue_name, $queue_name); $this->channel->close(); $this->connection->close(); } //消费者,代码中调用消费 public function workerMsg($queue_name, $callback = null) { $this->init($queue_name); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } //常驻内存 消费实例 protected function execute(InputInterface $input, OutputInterface $output) { $queue = new QueueService(); $queue->workerMsg('order_check', function ($msg) { $unique = md5(uniqid(time())); try { logger("[$unique]order check Queue Received[{ $msg->body}] "); $orderData = json_decode($msg->body, true); $orderId = $orderData['id']; $flag = array_get($orderData, 'flag'); $order = new OrderService(); list($code, $errMsg,) = $order->changeOrderStatus($orderId, $flag); if ($code > 0) { //意外错误重新分配 logger("[$unique]order check Queue Exception[{ $code} { $errMsg}] "); $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); } else { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } } catch (\Exception $e) { //意外错误重新分配 logger("[$unique]order check Queue Exception[{ $e->getCode()} { $e->getMessage()}] "); $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); } }); }
转载地址:https://chocolate.blog.csdn.net/article/details/106680582 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月13日 13时01分57秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Go-多路选择和超时控制
2019-04-29
Go-channel的关闭和广播
2019-04-29
Go-任务的取消
2019-04-29
AIX 作为Web Server 使用时,tcp相关的几个参数调整
2019-04-29
自我学习37:请描述一下网页从开始请求到最后展示的完整过程
2019-04-29
自我学习38:如何区分前后端BUG
2019-04-29
自我学习39:接口自动化测试用例&功能测试用例区别
2019-04-29
mirror去兔子补丁下载 附安装教程
2019-04-29
mirror去兔子补丁 v3.0附安装教程
2019-04-29
mirror去兔子补丁为什么还有兔子_mirror去兔子补丁使用教程
2019-04-29
3dmax2012安装教程
2019-04-29
OC渲染器(Octane Render)整合版安装包 附安装教程
2019-04-29
操作系统期末大题复习
2019-04-29
hive:分区表,hbase外表
2019-04-29
想要成为运维,想要成为后期的架构师?这些知识是必备的!
2019-04-29
linux 是如何 快速一键安装禅道的呐?
2019-04-29
运维面试基础试题(四)
2019-04-29
一键安装Openstack单节点 必能成功
2019-04-29
面试紧张怎么办
2019-04-29
关系型数据库 ,nosql数据库简介
2019-04-29