本文共 1043 字,大约阅读时间需要 3 分钟。
消息可靠性投递背景
消息可靠性投递方案
两个重要监听:
ConfirmCallback:
1. 判断消息是否到达exchange,exchange之后不负责。
2. 成功到达exchange并且持久化之后回调。或者没有到达exchange,持久化失败都会回调。ack为true或者false表示
ReturnCallback:
1. 判断exchange之后,比如到达exchange但是没有队列,路由失败就会回调。只有失败才会回调。
2. 必须开启setMandatary(true)。否则不会回调ReturnCallback
生产者消息可靠投递
1. 生产者执行业务逻辑,存储消息记录,消息状态为0待确认(两个操作原子操作,利用事务),重试次数为0,然后发送消息给MQ。
2 生产者监听ConfirmCallBackListener,确保消息到达Exchange(注意:只能确保是否到达exchagne,不是队列,更不是消费者)。成功与否都会回调,所以能确保消息是否到达Exchange并且被持久化
3.下一步,消息从exchange能够到达queue,注册RetrunCallBack监听。注意:只有失败才会回调。如果失败,消息路由失败,没有到达队列。在ReturnCallBack修改消息状态从已确认-待确认,进行消息补偿逻辑。
4. 消息路由已经路由到queue,如何保证消费者成功处理消息呢?比如消费者处理消息异常。这个节点依靠消费者确认机制。消费者在消费消息时,channel.basicAck或者channel.basicNack进行确认或者拒绝。如果broker接受到到是拒绝,消息不会删除,仍旧在队列中。从而保证消费者一定成功成功处理消息。
5 消费补偿机制,利用分布式的定时任务读取待确认并且超过一分钟等消息(为了防止接收confirmcallback延迟导致重复发送问题),重新发送,并记录retry count,如果大于3次,设置为发送失败,需要人工干预,防止消息堆积太多。
6 消息去重机制,因为生产者发送一条消息后,exchange接收失败导致confirmcallback没有回调或者网络问题导致生产者没有收到(后者导致消息重复发送),事务补偿机制重新发送(间隔需要设置相对合理等值),所以消费者需要进行消息去重。
上面是主要的逻辑,前提的配置条件是:exchange,borker,queue都要持久化配置,并且采用镜像队列等多备份
架构图
参考文档:
转载地址:https://kerry.blog.csdn.net/article/details/88196449 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!