Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:01:57
浏览次数:3
分类:技术文章
本文共 3942 字,大约阅读时间需要 13 分钟。
目录
基本概念
当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理。
主要是让消费者处理完后,回信息给RabbitMQ,然后RabbitMQ才会发送下一个。
使用basicQos(perfetch = 1)
注意:使用公平分发必须关闭自动应答ack改成手动。
代码与实例
程序运行截图如下:
生产者:
两个消费者,其中一个的效率是另外一个的2倍(X2的效率)
源码如下:
Send.java
package work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个 //限制发送给一个消费者不超过一条 int prefetchCount = 1; channel.basicQos(prefetchCount); for(int i = 0; i < 50; i++){ String msg = "Hello World : " + i; System.out.println("send msg : " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); } channel.close(); connection.close(); }}
Recv1.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //保证一次只发一个 channel.basicQos(1); System.out.println("recv1 running"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[1] msg is : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[1] done"); /*关闭自动应答,手动处理*/ channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); }}
Recv2.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println("recv2 running"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[2] msg is : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[1] done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); }}
源码下载地址:
转载地址:https://it1995.blog.csdn.net/article/details/93596563 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
网站不错 人气很旺了 加油
[***.192.178.218]2024年04月22日 10时35分40秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
【深度学习笔记】文本分类
2019-04-30
【转载】炼丹实验室:深度学习网络调参技巧
2019-04-30
【论文阅读笔记】文本分类论文汇总
2019-04-30
【NLP学习笔记】One-hot encoding:独热编码
2019-04-30
【工具使用】CSDN编辑器markdown字体、颜色与字号的设置
2019-04-30
【NLP学习笔记】词共现矩阵
2019-04-30
【NLP学习笔记】NLP基础知识框架图
2019-04-30
【深度学习笔记】卷积的输入输出的通道、维度或尺寸变化过程
2019-04-30
【NLP学习笔记】训练集、验证集和测试集的概念及划分
2019-04-30
【NLP学习笔记】conda换源
2019-04-30
【深度学习笔记】标准卷积
2019-04-30
【深度学习笔记】组卷积
2019-04-30
【深度学习笔记】循环神经网络和递归神经网络区别
2019-04-30
【学习笔记】英文科技论文常见英语句式积累
2019-04-30
【深度学习笔记】PixelShuffle
2019-04-30
【python3学习笔记】斜杠和双斜杠运算符的区别
2019-04-30