RabbitMQ --- 六种工作模式(一)
发布日期:2021-05-20 01:12:09 浏览次数:19 分类:精选文章

本文共 6966 字,大约阅读时间需要 23 分钟。

RabbitMQ(一)- 协议与工作模式

RabbitMQ 是一款强大的消息中间件软件,它被广泛应用于分布式系统中,用于解决消息的高效传输和处理问题。本文将深入探讨 RabbitMQ 的安装配置及其核心工作模式。

一、RabbitMQ 的六种工作模式

RabbitMQ 提供了六种不同的工作模式,适用于不同的业务场景。以下是对每种模式的详细介绍。

1. 简单模式(Simple Message Pattern)

简单模式最基础,适合传递单一消息。消息生产者(Producer)通过发布到交换机(Exchange),消息接收者(Consumer)从交换机获取消息。这种模式的特点是简单直接,但没有消息持久化机制,消息可能会丢失。

生产者代码示例(Java)

public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("helloworld");
channel.basicPublish("", "helloworld", null, "hello".getBytes());
channel.close();
connection.close();
}
}

消费者代码示例(Java)

public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("helloworld");
channel.basicConsume("helloworld", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws Exception {
System.out.println("收到消息:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws Exception {
// 消费者关闭会触发回调
}
});
}
}

2. 工作队列模式(Message Queue Pattern)

工作队列模式适用于需要并发处理任务的场景。生产者将任务封装为消息发送到队列,后台worker进程执行任务。队列支持多个消费者同时读取消息,消息会自动分发给空闲的消费者,确保任务平衡分配。

生产者代码示例(Java)

public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("helloworld", false, false, false, null);
while (true) {
System.out.println("请输入消息:");
String msg = new Scanner(System.in).nextLine();
channel.basicPublish("", "helloworld", null, msg.getBytes());
}
}
}

消费者代码示例(Java)

public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("helloworld");
channel.basicConsume("helloworld", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws Exception {
System.out.println("收到消息:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws Exception {
// 消费者关闭触发回调
}
});
}
}

3. 发布/订阅模式(Publish-Subscribe Pattern)

发布/订阅模式适用于需要多对多消息分发的场景。消息发布者(Publisher)发送消息给交换机(Exchange),消息接收者(Subscriber)通过订阅主题(Topic)从交换机获取消息。这种模式的特点是消息被广播至所有订阅者。

生产者代码示例(Java)

public class Publisher {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
while (true) {
System.out.println("请输入消息:");
String msg = new Scanner(System.in).nextLine();
channel.basicPublish("logs", "", null, msg.getBytes());
}
}
}

消费者代码示例(Java)

public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.126.129");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws Exception {
System.out.println("收到消息:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws Exception {
// 消费者关闭触发回调
}
});
}
}

二、工作模式详解

1. 消息持久化(Message Persistence)

在 RabbitMQ 中,消息持久化可以确保消息在服务重新启动后仍然存在。需要注意的是,仅队列和消息都需要配置为持久化(Durable)。

队列持久化设置

// 定义持久化队列
channel.queueDeclare("task_queue", true, false, false, null);

消息持久化设置

// 发送持久化消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

2. 消息确认(Ack)

默认情况下,消息在成功接收后会自动删除(Auto-Delete)。为了保证消息不丢失,需要手动确认消息(Ack)。如果消费者遇到异常或提前退出,消息会被重新放回队列,等待其他消费者处理。

消费者设置Ack

//消费者代码中设置Ack
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);

消费者在处理消息后,通过调用 channel.basicAck() 手动确认消息。

3. 消息重回队列(Rearment Queue)

如果消费者无法处理消息(如发生异常),可以选择将消息重回队列,以便其他消费者处理。

Consumer 处理异常示例

public class Consumer {
public static void main(String[] args) throws Exception {
...
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws Exception {
try {
System.out.println("收到消息:" + new String(message.getBody()));
// 处理成功,手动确认消息
channel.basicAck(message.getMessageId());
} catch (Exception e) {
// 处理失败,将消息重回队列
channel.basicNack(message.getMessageId(), true);
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
// 消费者关闭
}
});
}
}

4. FIFO 和 QoS 模式

RabbitMQ 提供了 FIFO(First-In, First-Out)队列和 QoS(Quality of Service)机制,以控制消息的传输速度和 Traffic。

QoS 配置示例

// 生产者发送QoS消息
channel.basicPublish("", "qos_queue", null, msg.getBytes(), null, 1, 1000);

在这种情况下,最多有1条消息同时传输,每条消息最多占用1000ms的带宽。

三、RabbitMQ 的优势

  • 灵活性:支持多种工作模式,适用于不同场景。
  • 可扩展性:通过增加消费者,任务处理能力可以线性扩展。
  • 高可用性:消息持久化和消息确认机制确保消息不丢失。

通过以上内容,读者可以全面理解 RabbitMQ 的工作原理及其应用场景。

上一篇:Spring Cloud --- Turbine使用
下一篇:order service 调用商品库存服务和用户服务

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年04月17日 14时00分06秒