
本文共 6966 字,大约阅读时间需要 23 分钟。
RabbitMQ(一)- 协议与工作模式
RabbitMQ 是一款强大的消息中间件软件,它被广泛应用于分布式系统中,用于解决消息的高效传输和处理问题。本文将深入探讨 RabbitMQ 的安装配置及其核心工作模式。
一、RabbitMQ 的六种工作模式
RabbitMQ 提供了六种不同的工作模式,适用于不同的业务场景。以下是对每种模式的详细介绍。
1. 简单模式(Simple Message Pattern)
简单模式最基础,适合传递单一消息。消息生产者(Producer)通过发布到交换机(Exchange),消息接收者(Consumer)从交换机获取消息。这种模式的特点是简单直接,但没有消息持久化机制,消息可能会丢失。
生产者代码示例(Java):
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworld"); channel.basicPublish("", "helloworld", null, "hello".getBytes()); channel.close(); connection.close(); }}
消费者代码示例(Java):
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworld"); channel.basicConsume("helloworld", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws Exception { System.out.println("收到消息:" + new String(message.getBody())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws Exception { // 消费者关闭会触发回调 } }); }}
2. 工作队列模式(Message Queue Pattern)
工作队列模式适用于需要并发处理任务的场景。生产者将任务封装为消息发送到队列,后台worker进程执行任务。队列支持多个消费者同时读取消息,消息会自动分发给空闲的消费者,确保任务平衡分配。
生产者代码示例(Java):
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworld", false, false, false, null); while (true) { System.out.println("请输入消息:"); String msg = new Scanner(System.in).nextLine(); channel.basicPublish("", "helloworld", null, msg.getBytes()); } }}
消费者代码示例(Java):
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworld"); channel.basicConsume("helloworld", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws Exception { System.out.println("收到消息:" + new String(message.getBody())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws Exception { // 消费者关闭触发回调 } }); }}
3. 发布/订阅模式(Publish-Subscribe Pattern)
发布/订阅模式适用于需要多对多消息分发的场景。消息发布者(Publisher)发送消息给交换机(Exchange),消息接收者(Subscriber)通过订阅主题(Topic)从交换机获取消息。这种模式的特点是消息被广播至所有订阅者。
生产者代码示例(Java):
public class Publisher { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); while (true) { System.out.println("请输入消息:"); String msg = new Scanner(System.in).nextLine(); channel.basicPublish("logs", "", null, msg.getBytes()); } }}
消费者代码示例(Java):
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.126.129"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws Exception { System.out.println("收到消息:" + new String(message.getBody())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws Exception { // 消费者关闭触发回调 } }); }}
二、工作模式详解
1. 消息持久化(Message Persistence)
在 RabbitMQ 中,消息持久化可以确保消息在服务重新启动后仍然存在。需要注意的是,仅队列和消息都需要配置为持久化(Durable)。
队列持久化设置:
// 定义持久化队列channel.queueDeclare("task_queue", true, false, false, null);
消息持久化设置:
// 发送持久化消息channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
2. 消息确认(Ack)
默认情况下,消息在成功接收后会自动删除(Auto-Delete)。为了保证消息不丢失,需要手动确认消息(Ack)。如果消费者遇到异常或提前退出,消息会被重新放回队列,等待其他消费者处理。
消费者设置Ack:
//消费者代码中设置Ackchannel.basicConsume(queueName, false, deliverCallback, cancelCallback);
消费者在处理消息后,通过调用 channel.basicAck()
手动确认消息。
3. 消息重回队列(Rearment Queue)
如果消费者无法处理消息(如发生异常),可以选择将消息重回队列,以便其他消费者处理。
Consumer 处理异常示例:
public class Consumer { public static void main(String[] args) throws Exception { ... channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws Exception { try { System.out.println("收到消息:" + new String(message.getBody())); // 处理成功,手动确认消息 channel.basicAck(message.getMessageId()); } catch (Exception e) { // 处理失败,将消息重回队列 channel.basicNack(message.getMessageId(), true); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { // 消费者关闭 } }); }}
4. FIFO 和 QoS 模式
RabbitMQ 提供了 FIFO(First-In, First-Out)队列和 QoS(Quality of Service)机制,以控制消息的传输速度和 Traffic。
QoS 配置示例:
// 生产者发送QoS消息channel.basicPublish("", "qos_queue", null, msg.getBytes(), null, 1, 1000);
在这种情况下,最多有1条消息同时传输,每条消息最多占用1000ms的带宽。
三、RabbitMQ 的优势
- 灵活性:支持多种工作模式,适用于不同场景。
- 可扩展性:通过增加消费者,任务处理能力可以线性扩展。
- 高可用性:消息持久化和消息确认机制确保消息不丢失。
通过以上内容,读者可以全面理解 RabbitMQ 的工作原理及其应用场景。
发表评论
最新留言
关于作者
