干货实战-RabbitMQ的消息高可用和确认消费
发布日期:2021-05-20 08:01:02 浏览次数:22 分类:精选文章

本文共 2979 字,大约阅读时间需要 9 分钟。

RabbitMQ消息高可用与确认消费

作为一名技术栏目作者,我在写作过程中深知写作不易,因此衷心感谢您的浏览和支持。您的点赞、评论和收藏将是我写下去的最大动力。

在实际项目中,RabbitMQ作为一款强大的消息中间件,不仅提供了便利的消息表示和传输机制,更重要的是其在高可用性和消息可靠性方面的表现令人瞩目。本文将重点探讨RabbitMQ在消息高可用和确认消费方面的实现原理和实践经验。

消息确认是阻止消息丢失的关键机制。以下是确认消费的工作流程:

  • 发送端确认:生产者在消息发送成功后会接收到确认通知(confirm),即可确定消息已成功入队。
  • 消费端确认:当消费者读取消息并成功处理后,需主动发送确认(ack)消息告知RabbitMQ消息已经被成功消费。
  • 确认消费的核心意义在于保障消息的可靠传输,确保在网络故障或服务崩溃时不会丢失消息。

    在实际使用中,我们经常面临以下问题及其优化方案:

    1. 消息发送失败或未送达

    此问题的根本原因是缺乏对消息发送结果的确认机制。我们可以通过以下配置解决:

    @Beanpublic RabbitTemplate rabbitTemplate() {    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(new ConfirmCallback() {        @Override        public void confirm(CorrelationData correlationData, boolean ack, String cause) {            if (!ack) {                log.warn("消息发送失败: exchange={}, routingKey={}, cause={}", correlationData.getExchange(), correlationData.getRoutingKey(), cause);            }        }    });    rabbitTemplate.setReturnCallback(new ReturnCallback() {        @Override        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {            log.warn("消息丢失: exchange={}, routingKey={}, replyCode={}, replyText={}", exchange, routingKey, replyCode, replyText);        }    });    return rabbitTemplate;}

    2. RabbitMQ服务崩溃导致消息丢失

    为避免因服务崩溃导致消息丢失,我们需要采取持久化配置:

    @Bean(name = "directQueueOne")public Queue directQueueOne() {    return new Queue(environment.getProperty("mq.yzy.info.directqueue.name"), true);}@Bean(name = "directQueueTwo")public Queue directQueueTwo() {    return new Queue(environment.getProperty("mq.yzy.info.directqueue.name1"), true);}@Beanpublic DirectExchange directExchange() {    return new DirectExchange(environment.getProperty("mq.yzy.info.directexchange.name"), true, false);}

    此外,在发布消息时,我们还需要确保消息的持久化模式:

    rabbitTemplate.convertAndSend(env.getProperty(ROUTING), user, new MessagePostProcessor() {    @Override    public Message postProcessMessage(Message message) throws AmqpException {        MessageProperties properties = message.getMessageProperties();        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);        properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, User.class);        return message;    }}, new CorrelationData(uuid));

    3. 消费者监听失败或处理异常导致重复消费

    为了防止消息重复消费,我们可以通过启用消息确认机制:

    ackets.setAutomaticRecoveryEnabled(true);acks = rabbitTemplate.getConnection().setAutoRecovery(true);acks.setAutoRecovery(true);/** * 消费客户端确认策略 */acks.setConfirmListener(new ActionListener() {    @Override    public void onConfirm(Type type) {        if (Type.PUBLISHED == type) {            log.info("消息被确认,类型:" + type);        }    }});acks.start();

    RabbitMQ 提供了三种确认方式:

  • NONE:无需确认消费。在这种模式下,消息一旦发布到队列中,消费者即可读取,没有需要发送确认消息。
  • AUTO:自动确认消费。消费者在读取消息时,RabbitMQ会自动在消息处理完成后发送确认消息,将消息从队列中移除。
  • MANUAL:手动确认消费。消费者在处理完成后,需要循环发送确认消息告知 RabbitMQ 消息已被成功处理。
  • 通过灵活配置RabbitMQ的消息确认机制,可以有效保障消息的高可用性,避免因系统故障导致消息丢失和重复消费情景的发生。

    以上就是RabbitMQ在消息高可用和确认消费方面的实践经验,希望对您有所帮助。如果您有任何关于RabbitMQ的疑问或问题,欢迎随时交流!

    上一篇:干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型
    下一篇:干货实战演练-RabbitMQ订阅消息模型

    发表评论

    最新留言

    留言是一种美德,欢迎回访!
    [***.207.175.100]2025年04月18日 11时35分42秒