Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)
发布日期:2021-06-30 11:01:54
浏览次数:2
分类:技术文章
本文共 3519 字,大约阅读时间需要 11 分钟。
目录
基本概念
简单队列的不足:
1. 耦合性高;
2. 如果生产者把生产队列该了,消费者也要同时改;
Work Queues工作队列,模型如下:
代码与实例
程序运行截图如下:
生产者:
两个消费者如下:
在源码中,消费者一个是1秒消费一次,一个是2秒消费一次,但从我们知道,RabbitMQ给他轮询分发。平均分配
程序结构如下:
Send.java
package work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for(int i = 0 ; i < 50; i++){ String msg = "Hello World : " + i; System.out.println("send msg : " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); } channel.close(); connection.close(); }}
Recv1.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("recv1 running"); //消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[1] msg is : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[1] done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); }}
Recv2.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("recv2 running"); //消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[2] msg is : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[2] done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); }}
源码打包下载:
转载地址:https://it1995.blog.csdn.net/article/details/93325486 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月13日 08时09分37秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
UTM GTM
2019-04-30
java是如何写入文件的
2019-04-30
conda 安装
2019-04-30
redis cli
2019-04-30
redis api
2019-04-30
flink physical partition
2019-04-30
java 解析json
2019-04-30
java http请求
2019-04-30
tensorflow 数据格式
2019-04-30
tf rnn layer
2019-04-30
常用中间件
2019-04-30
tf input layer
2019-04-30
tf model create
2019-04-30
tf dense layer两种创建方式的对比和numpy实现
2019-04-30
tf initializer
2019-04-30
tf 从RNN到BERT
2019-04-30
tf keras SimpleRNN源码解析
2019-04-30
tf keras Dense源码解析
2019-04-30
tf rnn输入输出的维度和权重的维度
2019-04-30
检验是否服从同一分布
2019-04-30