
RabbitMQ学习笔记——交换机(Exchange)详解
发布日期:2021-05-04 18:18:28
浏览次数:28
分类:技术文章
本文共 7275 字,大约阅读时间需要 24 分钟。
概念
Exchange:接收消息,并根据路由键转发消息至所绑定的队列中

交换机的属性
Name
:交换机名称Type
:交换机类型direct、topic、fanout、headersDurability
:是否需要持久化,true表示持久化Auto Delete
:当最后一个绑定到Exchange上的队列删除后,自动删除该ExchangeInternal
:当前Exchange是否用于RabbitMQ内部使用,默认为FalseArguments
:扩展参数,用于扩展AMQP协议自制定化使用
交换机的类型
1)Direct Exchange
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue ( 注:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。)
代码实现:
public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.43.157"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //5 发送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); //6 关闭连接 channel.close(); connection.close(); } }
消费端
public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.43.157"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); // 1.设置是否自动重连(网络闪断时) 2.每3秒重连一次 connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 //参数分别为:name,type,durable,autoDelete,internal,arguments channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 //参数分别为:name,durable,exclusive,autoDelete,arguments channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系(exchange和queue) channel.queueBind(queueName, exchangeName, routingKey); //细节一:同一个队列可以绑定多个值 channel.queueBind(queueName, exchangeName, "666"); //细节二:不同队列可以绑定相同的RoutingKey String queueName2 = "test_direct_queue2"; channel.queueDeclare(queueName2, false, false, false, null); channel.queueBind(queueName2, exchangeName, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //监听多个队列 channel.basicConsume(queueName2, true, consumer); // 循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } }}
然后启动生产端,此时消费端控制台进行了打印,共消费了两条消息,说明监听的两个队列都接收到了消息。
收到消息:Hello World RabbitMQ 4 Direct Exchange Message ... 收到消息:Hello World RabbitMQ 4 Direct Exchange Message ...
当我们修改生产端的routingKey值为:666时,那么只有队列一接收到消息并被消费者消费。
如果修改值为:test.direct111,此时在启动生产端,消费端就收不到消息了,这就是直连的方式。2)Topic Exchange
所有发送到 Topic Exchange 的消息被转发到所有关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic注意:可以使用通配符进行模糊匹配
//省略创建连接和通道... //4 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 发送 String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); //关闭连接...
消费端
//省略创建连接和通道... //4 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.#"; // 声明Topic Exchange channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 建立交换机和队列的绑定关系 channel.queueBind(queueName, exchangeName, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息...
运行说明:先启动消费端,同样可以在管控台可以看到我们新声明的exchange以及它的绑定队列,这里不再细说。然后启动生产端,此时消费端控制台进行了打印,共消费了三条消息,说明三条消息都和队列指定的Topic匹配上了,因为使用的是 # 匹配
收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
将消费端指定的RoutingKey进行修改:routingKey = "user.*";
user.delete.abc
没有匹配上,因为使用的是 *
匹配,这就是 Topic Exchange 的路由方式。 3)Fanout Exchange
- 不处理路由键,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的(性能最好)
//省略创建连接和通道... //4 声明 String exchangeName = "test_fanout_exchange"; //5 发送 for(int i = 0; i < 5; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; //不设置路由键或者设置任意内容均可 channel.basicPublish(exchangeName, "", null , msg.getBytes()); } //关闭连接...
消费端:声明一个Fanout Exchange,声明队列,建立交换机和队列的绑定关系,绑定关系不使用RoutingKey
//省略创建连接和通道... //4 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不设置路由键 //声明交换机、队列、绑定关系 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息...
先启动消费端,然后启动生产端,消息被成功消费,这种就是Fanout Exchange,它不走任何的路由规则,直接将消息路由到所有与它绑定的队列。
收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
发表评论
最新留言
很好
[***.229.124.182]2025年03月13日 20时58分36秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
导入工程时出现错误
2019-03-01
BCGControlBar教程:应用向导
2019-03-01
MyEclipse教程:Web开发——部署并测试项目
2019-03-01
【更新】CLion v2018.3发布(六):VCS和插件
2019-03-01
文件服务器——src文件夹
2019-03-01
从零构建通讯器--5.2三次握手,telnet,wireshark
2019-03-01
如何判断两个浮点数是否相等?
2019-03-01
2021牛客寒假算法基础集训营3
2019-03-01
营收环比增幅近50%,星巴克在经历“劫”后重生吗?
2019-03-01
苹果进军搜索,背后藏着什么“阳谋”?
2019-03-01
egg:如何在控制器中拿到前端传的参数
2019-03-01
MVC之修改
2019-03-01
struct 模块
2019-03-01
python之面向对象编程
2019-03-01
python之集合类型内置方法
2019-03-01
编程与编程语言分类
2019-03-01