Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)
发布日期:2021-06-30 11:01:54 浏览次数:2 分类:技术文章

本文共 3519 字,大约阅读时间需要 11 分钟。

目录

 

 


 

基本概念

简单队列的不足:

           1. 耦合性高;

           2. 如果生产者把生产队列该了,消费者也要同时改;

 

Work Queues工作队列,模型如下:

 

 

代码与实例

程序运行截图如下:

生产者:

两个消费者如下:

在源码中,消费者一个是1秒消费一次,一个是2秒消费一次,但从我们知道,RabbitMQ给他轮询分发。平均分配

程序结构如下:

Send.java

package work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = ConnectionUtils.getConnect();        Channel channel = connection.createChannel();        //声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        for(int i = 0 ; i < 50; i++){            String msg = "Hello World : " + i;            System.out.println("send msg : " + msg);            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());            Thread.sleep(i * 20);        }        channel.close();        connection.close();    }}

Recv1.java

package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtils.getConnect();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        System.out.println("recv1 running");        //消费者        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String msg = new String(body, "utf-8");                System.out.println("Recv[1] msg is : " + msg);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("Recv[1] done");                }            }        };        channel.basicConsume(QUEUE_NAME, true, consumer);    }}

Recv2.java

package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv2 {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtils.getConnect();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        System.out.println("recv2 running");        //消费者        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String msg = new String(body, "utf-8");                System.out.println("Recv[2] msg is : " + msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("Recv[2] done");                }            }        };        channel.basicConsume(QUEUE_NAME, true, consumer);    }}

源码打包下载:

转载地址:https://it1995.blog.csdn.net/article/details/93325486 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Qt文档阅读笔记-GridLayout QML Type解析与实例
下一篇:Web前端笔记-i标签做小图标以及改源码注意事项

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月13日 08时09分37秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章