
本文共 2917 字,大约阅读时间需要 9 分钟。
Spring Cloud Stream + RabbitMQ 实践与问题解决
1. 概述
本文主要记录了使用 Spring Cloud Stream 以 RabbitMQ 作为消息队列系统的实践过程,涵盖了从配置到代码开发、测试以及问题解决的全过程。
2. 项目依赖管理
项目依赖主要基于 Spring Boot 1.5.2 和 Spring Cloud Dalston.RELEASE 版本,具体依赖如下:
父亲依赖:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent>
主要依赖:
<groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>
3. 应用配置(application.properties)
spring.application.name=rabbitmq-helloserver.port=8080spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=rootspring.rabbitmq.password=root
注意事项:如果不编写 application.properties
,默认用户名和密码为 guest/guest
。
4. 代码开发
4.1 消息发送者接口
public interface MsgSender { @Output(Sink.INPUT) MessageChannel sendMsg();}
4.2 消息接收者
@EnableBinding(value = {Sink.class, MsgSender.class})public class MsgReceiver { @StreamListener(Sink.INPUT) public void receiverMsg(Object msg) { System.out.println("消息接收者接收消息: " + msg); }}
4.3 入口类
@RestController@SpringBootApplicationpublic class StreamRabbitApplication { @Autowired private MsgSender msgSender; @RequestMapping("/send") public String send(String msg) { msgSender.sendMsg().send(MessageBuilder.withPayload(msg).build()); return "消息发送成功: " + msg; } public static void main(String[] args) { springApplication.run(StreamRabbitApplication.class, args); }}
5. 测试与问题发现
5.1 首次测试
使用 http://localhost:8080/send?msg=你好
发起请求,测试成功。
问题回报:第二次发送其他消息时,接收方出现不可访问错误。
解决方案:通过长时间测试发现消息丢失问题,后台频繁报错。
6. 消息转换测试
6.1 创建消息类
public class Message { private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "Msg [msg=" + msg + "]"; } public Message(String msg) { super(); this.msg = msg; } public Message() { super(); }}
6.2 修改发送和接收逻辑
@EnableBinding(value = {Sink.class, MsgSender.class})public class MsgReceiver { @StreamListener(Sink.INPUT) @SendTo(value = Source.OUTPUT) public String receiverMsg(Message msg) { System.out.println("消息接收者接收消息: " + msg); return msg.getMsg() + " modify by time "; }}
6.3 创建处理队列的类
@EnableBinding(value = Source.class)public class MsgReceiverLast { @StreamListener(Source.OUTPUT) public void handlerEnd(String msg) { System.out.println("last 接收消息: " + msg); }}
7. 转发功能测试
7.1 测试结果
通过测试发现,消息能够成功转发并被 MsgReceiverLast
接收。
8. 总结
RabbitMQ 配置简单直观:通过 @Input
和 @Output
指定消息队列,消息提供者需返回 MessageChannel
,消息处理者需指定队列。
绑定与监听:使用 @EnableBinding
绑定类,结合 @StreamListener
指定队列,实现消息监听和处理。
消息丢失与后台报错:通过长时间测试发现消息丢失问题,后台频繁报错。
消息转发实现:使用 @SendTo
将处理结果转发到其他队列,供其他消费者处理。
以上内容均为个人实践经验,欢迎交流与补充!
发表评论
最新留言
关于作者
