
干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型
发布日期:2021-05-20 08:01:02
浏览次数:23
分类:精选文章
本文共 5157 字,大约阅读时间需要 17 分钟。
RabbitMQ专栏直通车
前言
欢迎来到RabbitMQ专栏的第二篇内容。本文将详细介绍RabbitMQ的MANUAL手动确认消费模型,结合代码实战,帮助你理解在实际项目中如何实现手动确认消费的步骤。
RabbitMQ的核心组件
在开始MANUAL手动确认消费之前,首先回顾RabbitMQ的核心组件:
- 生产者:负责将消息发送到RabbitMQ消息队列。
- 消费者:接收来自队列的消息并进行处理。
- 消息:以二进制数据流形式传输,内容可以是文字、图片等。
- 队列:消息的临时存储区,起到中转站的作用。
- 交换机:消息的中转站,用于分发消息。
- 路由:类似网关,用于将消息指定到特定的队列。
MANUAL手动确认消费模型
MANUAL手动确认消费意味着在消费者接收到消息后,需要手动通过代码发送ACK反馈给RabbitMQ服务器。这样消息才会从队列中被移除。
前期准备:引入相关依赖
使用Spring Boot启动项目,引入RabbitMQ相关依赖:
org.springframework.boot spring-boot-starter-amqp 1.3.3.RELEASE
代码实战:配置RabbitMQ
在application.properties
中添加RabbitMQ配置:
spring: rabbitmq: virtual-host: / host: 127.0.0.1 port: 5672 username: guest password: guest
配置确认消费模型
实战配置说明:
@Configurationpublic class ManualMqConfig { private static final Logger log = LoggerFactory.getLogger(ManualMqConfig.class); @Autowired private Environment environment; @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private ManualConsumer manualConsumer; @Bean("manualQueueOne") public Queue manualQueueOne() { return new Queue(getEnvironmentVariable("mq.yzy.info.manualqueue.name"), true); } @Bean public DirectExchange manualExchange() { return new DirectExchange(getEnvironmentVariable("mq.yzy.info.manualexchange.name"), true, false); } @Bean public Binding basicBindingOne() { return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(getEnvironmentVariable("mq.yzy.info.manualrouting.key.name")); } @Bean("manualListenerContainer") public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue) { SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer(); factory.setConnectionFactory(cachingConnectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setQueues(manualQueue); factory.setMessageListener(manualConsumer); return factory; }}
配置消息持久化
确保消息持久化配置正确:
public class ManualMqConfig { // ... @Bean 公uisse exchange配置类似于上述代码。 // 消费者配置 @Component("manualConsumer") public class ManualConsumer implements ChannelAwareMessageListener { @Autowired private ObjectMapper objectMapper; @Override public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); long tag = messageProperties.getDeliveryTag(); try { byte[] msg = message.getBody(); Student student = objectMapper.readValue(msg, Student.class); log.info("基于MANUAL机制-确认消息模式-人为手动确定消费-监听到消息:{}", objectMapper.writeValueAsString(student)); channel.basicAck(tag, true); } catch (Exception e) { log.error("确认消息模式-人为手动确定消费-发生异常:", e.fillInStackTrace()); channel.basicReject(tag, false); } } }
生产者配置
实现消息发送:
@Componentpublic class ManualPublisher { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; @Autowired private ObjectMapper objectMapper; private final String EXCHANGE = "mq.yzy.info.manualexchange.name"; private final String ROUTING_KEY = "mq.yzy.info.manualrouting.key.name"; public void sendAutoMsg(Student student) { try { if (student != null) { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty(EXCHANGE)); Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(student)).build(); rabbitTemplate.convertAndSend(ROUTING_KEY, msg, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } }); log.info("基于MANUAL机制 - 生产者发出消息:{}", objectMapper.writeValueAsString(student)); } } catch (Exception e) { log.error("基于MANUAL机制 - 生产者发出消息-发生异常:{}", student, e.fillInStackTrace()); } }
测试接口
通过 Swagger 测试接口:
@RestControllerpublic class ManualController { @Autowired private ManualPublisher manualPublisher; @PostMapping("/manual") public ResponseBody manual(@RequestBody @Valid Student vo) throws JsonProcessingException { long startTime = init(objectMapper.writeValueAsString(vo)); try { manualPublisher.sendAutoMsg(vo); endLog(msgValue, startTime); } catch (Exception e) { endLogError(msgValue, startTime, e); } return ResponseBody.ok(); }}
代码公开
完整代码可在GitHub找到:
https://github.com/yangzhenyu07/springCloud
结束。感谢阅读!
发表评论
最新留言
关注你微信了!
[***.104.42.241]2025年05月02日 04时02分07秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Tomcat启动报404(eclipse)
2021-05-24
0X3协议与数据包
2021-05-24
flutter 错误The method '/' was called on null.
2021-05-24
云区块链在各行业的应用场景
2019-03-21
重复执行 cp -r 得到的结果有什么区别
2019-03-21
不会Fiddler安装和基本使用教程?看这篇就够了
2019-03-21
Makefile--Make运行
2019-03-21
C++ 函数需要有返回值,但非全分支return(RVO)
2019-03-21
常用Android模拟器的默认监听端口(转载)
2019-03-21
unicorn教程三
2019-03-21
Exploit-Exercise之Protostar-format
2019-03-21
python解释器环境问题
2019-03-21
hakcinglab解密关WP
2019-03-21
ubuntu系统重新更新系统服务
2019-03-21
深度学习与机器学习配置
2019-03-21
文档矫正
2019-03-21
MONASH-creatiue coding
2019-03-21
卷积的可视化学习
2019-03-21
图像质量评估仿真
2019-03-22
PHP使用之图片上传程序(完整版)
2019-03-22