Spring Cloud Netflix之Stream的简单Demo使用(使用RabbitMQ测试)
发布日期:2021-05-07 08:42:51 浏览次数:23 分类:精选文章

本文共 2917 字,大约阅读时间需要 9 分钟。

Spring Cloud Stream + RabbitMQ 实践与问题解决

1. 概述

本文主要记录了使用 Spring Cloud Stream 以 RabbitMQ 作为消息队列系统的实践过程,涵盖了从配置到代码开发、测试以及问题解决的全过程。

2. 项目依赖管理

项目依赖主要基于 Spring Boot 1.5.2 和 Spring Cloud Dalston.RELEASE 版本,具体依赖如下:

  • 父亲依赖:<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent>

  • 主要依赖:

    • <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version>
    • <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>

3. 应用配置(application.properties)

spring.application.name=rabbitmq-helloserver.port=8080spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=rootspring.rabbitmq.password=root

注意事项:如果不编写 application.properties,默认用户名和密码为 guest/guest

4. 代码开发

4.1 消息发送者接口

public interface MsgSender {    @Output(Sink.INPUT)    MessageChannel sendMsg();}

4.2 消息接收者

@EnableBinding(value = {Sink.class, MsgSender.class})public class MsgReceiver {    @StreamListener(Sink.INPUT)    public void receiverMsg(Object msg) {        System.out.println("消息接收者接收消息: " + msg);    }}

4.3 入口类

@RestController@SpringBootApplicationpublic class StreamRabbitApplication {    @Autowired    private MsgSender msgSender;    @RequestMapping("/send")    public String send(String msg) {        msgSender.sendMsg().send(MessageBuilder.withPayload(msg).build());        return "消息发送成功: " + msg;    }    public static void main(String[] args) {        springApplication.run(StreamRabbitApplication.class, args);    }}

5. 测试与问题发现

5.1 首次测试

使用 http://localhost:8080/send?msg=你好 发起请求,测试成功。

问题回报:第二次发送其他消息时,接收方出现不可访问错误。

解决方案:通过长时间测试发现消息丢失问题,后台频繁报错。

6. 消息转换测试

6.1 创建消息类

public class Message {    private String msg;    public String getMsg() {        return msg;    }    public void setMsg(String msg) {        this.msg = msg;    }    @Override    public String toString() {        return "Msg [msg=" + msg + "]";    }    public Message(String msg) {        super();        this.msg = msg;    }    public Message() {        super();    }}

6.2 修改发送和接收逻辑

@EnableBinding(value = {Sink.class, MsgSender.class})public class MsgReceiver {    @StreamListener(Sink.INPUT)    @SendTo(value = Source.OUTPUT)    public String receiverMsg(Message msg) {        System.out.println("消息接收者接收消息: " + msg);        return msg.getMsg() + " modify by time ";    }}

6.3 创建处理队列的类

@EnableBinding(value = Source.class)public class MsgReceiverLast {    @StreamListener(Source.OUTPUT)    public void handlerEnd(String msg) {        System.out.println("last 接收消息: " + msg);    }}

7. 转发功能测试

7.1 测试结果

通过测试发现,消息能够成功转发并被 MsgReceiverLast 接收。

8. 总结

  • RabbitMQ 配置简单直观:通过 @Input@Output 指定消息队列,消息提供者需返回 MessageChannel,消息处理者需指定队列。

  • 绑定与监听:使用 @EnableBinding 绑定类,结合 @StreamListener 指定队列,实现消息监听和处理。

  • 消息丢失与后台报错:通过长时间测试发现消息丢失问题,后台频繁报错。

  • 消息转发实现:使用 @SendTo 将处理结果转发到其他队列,供其他消费者处理。

  • 以上内容均为个人实践经验,欢迎交流与补充!

    上一篇:Jackson之转化字符出现的一个奇怪的问题
    下一篇:SpringBoot之RabbitMQ的简单使用的Demo

    发表评论

    最新留言

    感谢大佬
    [***.8.128.20]2025年04月14日 15时00分44秒