Spring Cloud Stream Mq
发布日期:2021-05-20 02:05:37 浏览次数:18 分类:精选文章

本文共 2118 字,大约阅读时间需要 7 分钟。

Spring Cloud Stream 和 RabbitMQ 实战指南

Spring Cloud Stream 概念

Spring Cloud Stream 是 Spring Cloud 旗下一个基于消息中间件的流处理平台,核心概念包括 BinderBinding,帮助开发者无需直接与底层消息中间件打交道。

Binder

Binder 是用于将消息中间件集成到 Spring Cloud Stream 中的组件。各大消息中间件如 Kafka、RabbitMQ 和 RocketMQ 都有其专门的 Binder 实现:

  • Kafka:通过 KafkaMessageChannelBinder 实现
  • RabbitMQ:通过 RabbitMessageChannelBinder 实现
  • RocketMQ:通过 RocketMQMessageChannelBinder 实现

Binding

Binding 又分为 Input BindingOutput Binding,其作用是在消息中间件与应用程序的提供者和消费者之间提供桥梁。通过 Binding,开发者可以直接使用自定义的生产者或消费者进行数据交换,而无需直接关注底层消息中间件的复杂性。


RabbitMQ 项目实战

基于 RabbitMQ 实现的 Spring Cloud Stream 项目涉及以下关键配置和操作。

Maven 依赖集成

在 Maven 项目中引入 RabbitMQ 相关依赖,实现 RabbitMQ 的自动配置:

org.springframework.cloud
spring-cloud-starter-stream-rabbit

Nacos 配置中心配置

RabbitMQ 的配置可以通过 Nacos 配置中心进行管理,配置文件示例如下:

spring:
rabbitmq:
host: hcp-rabbitmq
port: 5672
username: guest
password: guest

消息生产者配置

通过 @Output 声明消息生产者,指定输出绑定名和目的地配置:

spring:
cloud:
stream:
bindings:
device-output:
destination: DEVICE-TOPIC-TEXT-01
content-type: application/json

消息消费者配置

通过 @StreamListener 声称消息消费者,添加相关消费者配置:

spring:
cloud:
stream:
bindings:
device-input:
destination: DEVICE-TOPIC-TEXT-01
content-type: application/json
group: DEVICE-consumer-group-TOPIC-01
consumer:
max-attempts: 3
back-off-initial-interval: 3000
back-off-multiplier: 2.0
back-off-max-interval: 10000

死信队列处理

针对消息消费失败的情况,可以通过以下方式配置死信队列的处理:

@ServiceActivator(inputChannel = "DEVICE-TOPIC-TEXT-01.DEVICE-consumer-group-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
}
上一篇:JVM - GC基础
下一篇:灰度发布系统架构设计

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年05月15日 17时01分29秒