干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型
发布日期:2021-05-20 08:01:02 浏览次数:23 分类:精选文章

本文共 5157 字,大约阅读时间需要 17 分钟。

RabbitMQ专栏直通车

前言

欢迎来到RabbitMQ专栏的第二篇内容。本文将详细介绍RabbitMQ的MANUAL手动确认消费模型,结合代码实战,帮助你理解在实际项目中如何实现手动确认消费的步骤。

RabbitMQ的核心组件

在开始MANUAL手动确认消费之前,首先回顾RabbitMQ的核心组件:

  • 生产者:负责将消息发送到RabbitMQ消息队列。
  • 消费者:接收来自队列的消息并进行处理。
  • 消息:以二进制数据流形式传输,内容可以是文字、图片等。
  • 队列:消息的临时存储区,起到中转站的作用。
  • 交换机:消息的中转站,用于分发消息。
  • 路由:类似网关,用于将消息指定到特定的队列。

MANUAL手动确认消费模型

MANUAL手动确认消费意味着在消费者接收到消息后,需要手动通过代码发送ACK反馈给RabbitMQ服务器。这样消息才会从队列中被移除。

前期准备:引入相关依赖

使用Spring Boot启动项目,引入RabbitMQ相关依赖:

org.springframework.boot
spring-boot-starter-amqp
1.3.3.RELEASE

代码实战:配置RabbitMQ

application.properties中添加RabbitMQ配置:

spring:    rabbitmq:        virtual-host: /        host: 127.0.0.1        port: 5672        username: guest        password: guest

配置确认消费模型

实战配置说明:

@Configurationpublic class ManualMqConfig {    private static final Logger log = LoggerFactory.getLogger(ManualMqConfig.class);    @Autowired    private Environment environment;    @Autowired    private CachingConnectionFactory cachingConnectionFactory;    @Autowired    private ManualConsumer manualConsumer;    @Bean("manualQueueOne")    public Queue manualQueueOne() {        return new Queue(getEnvironmentVariable("mq.yzy.info.manualqueue.name"), true);    }    @Bean    public DirectExchange manualExchange() {        return new DirectExchange(getEnvironmentVariable("mq.yzy.info.manualexchange.name"), true, false);    }    @Bean    public Binding basicBindingOne() {        return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(getEnvironmentVariable("mq.yzy.info.manualrouting.key.name"));    }    @Bean("manualListenerContainer")    public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue) {        SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();        factory.setConnectionFactory(cachingConnectionFactory);        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);        factory.setConcurrentConsumers(1);        factory.setMaxConcurrentConsumers(1);        factory.setPrefetchCount(1);        factory.setQueues(manualQueue);        factory.setMessageListener(manualConsumer);        return factory;    }}

配置消息持久化

确保消息持久化配置正确:

public class ManualMqConfig {    // ...        @Bean   公uisse exchange配置类似于上述代码。    // 消费者配置    @Component("manualConsumer")    public class ManualConsumer implements ChannelAwareMessageListener {        @Autowired        private ObjectMapper objectMapper;        @Override        public void onMessage(Message message, Channel channel) throws Exception {            MessageProperties messageProperties = message.getMessageProperties();            long tag = messageProperties.getDeliveryTag();            try {                byte[] msg = message.getBody();                Student student = objectMapper.readValue(msg, Student.class);                log.info("基于MANUAL机制-确认消息模式-人为手动确定消费-监听到消息:{}", objectMapper.writeValueAsString(student));                channel.basicAck(tag, true);            } catch (Exception e) {                log.error("确认消息模式-人为手动确定消费-发生异常:", e.fillInStackTrace());                channel.basicReject(tag, false);            }        }    }

生产者配置

实现消息发送:

@Componentpublic class ManualPublisher {    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private Environment env;    @Autowired    private ObjectMapper objectMapper;    private final String EXCHANGE = "mq.yzy.info.manualexchange.name";    private final String ROUTING_KEY = "mq.yzy.info.manualrouting.key.name";    public void sendAutoMsg(Student student) {        try {            if (student != null) {                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());                rabbitTemplate.setExchange(env.getProperty(EXCHANGE));                Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(student)).build();                rabbitTemplate.convertAndSend(ROUTING_KEY, msg, new MessagePostProcessor() {                    public Message postProcessMessage(Message message) throws AmqpException {                        MessageProperties messageProperties = message.getMessageProperties();                        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);                        return message;                    }                });                log.info("基于MANUAL机制 - 生产者发出消息:{}", objectMapper.writeValueAsString(student));            }        } catch (Exception e) {            log.error("基于MANUAL机制 - 生产者发出消息-发生异常:{}", student, e.fillInStackTrace());        }    }

测试接口

通过 Swagger 测试接口:

@RestControllerpublic class ManualController {    @Autowired    private ManualPublisher manualPublisher;    @PostMapping("/manual")    public ResponseBody manual(@RequestBody @Valid Student vo) throws JsonProcessingException {        long startTime = init(objectMapper.writeValueAsString(vo));        try {            manualPublisher.sendAutoMsg(vo);            endLog(msgValue, startTime);        } catch (Exception e) {            endLogError(msgValue, startTime, e);        }        return ResponseBody.ok();    }}

代码公开

完整代码可在GitHub找到:

https://github.com/yangzhenyu07/springCloud

结束。感谢阅读!

上一篇:java Web项目 三种定时器的总结
下一篇:干货实战-RabbitMQ的消息高可用和确认消费

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年05月02日 04时02分07秒