消息丢失
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
- 发送消息时消息丢失
- 生产者发送消息到交换机时
- 交换机把消息发送给队列时
- MQ导致消息丢失
- 消息没来得及发送给消费者就消失了
- 消费者拿到消息后未消费导致消息丢失
- 消费者拿到消息没来得及消费就消失了
消息的可靠性
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消费者一定要处理消息
生产者的可靠性
生产者确认
生产者在向交换机传输消息时,有可能消息根本无法到达交换机,或者到达交换机后没有成功的将消息传递给队列,所以MQ针对这两点分别做出了保障,我们称之为生产者的确认。
Confirm模式:确保消息送达交换机
当消息成功到达 RabbitMQ 的交换机(Exchange)时,RabbitMQ 就会返回 ack 投递失败返回nack。
想要开启confirm模式也很简单,在publisher模块的application.yaml中添加配置:
1 | spring: |
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制simple:同步阻塞等待MQ的回执(等信息返回了再执行其他操作)correlated:MQ异步回调返回回执(发完消息直接做其他事情,有消息回调回来再进行处理)
Return回退机制:确保消息路由成功
在消息到达Exchange过后,MQ会返回ack,但是这并不代表消息就可以成功发送给队列,因此还需要一层return机制来保证:确保消息成功从交换机路由到目标队列,未路由的消息能被回退感知。
想要开启confirm模式也很简单,在publisher模块的application.yaml中添加配置:
1 | spring: |
生产者确认机制的实现
经过梳理整个生产者确认机制如下图:
我们现在来实现一下:
- 配置publisher模块的
application.yaml
1 | spring: |
- 配置
ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
- 配置
ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置,是一种初始化的操作,我们可以使用@PostConstruct来实现。
(@PostConstruct 是 Java 中的一个注解,常用于标记方法,使其在对象实例化并完成依赖注入后立即执行。它通常用于初始化逻辑,例如加载配置、建立连接等。)
我们建立一个MqReturnConfig配置类:
1 |
|
- 发送消息及配置
confirm
1 |
|
运行
- 所有信息均正确时,返回了ack:
- 当路由key不正确时,也返回了ack并且返回了我们编写的
ReturnCallback配置类(到达交换机,但没有成功路由):
- 当无法连接到时,也就是没有正常到达交换机时,返回了nack,同时给出了原因:
![image]()
连接可靠性:生产者自动重连机制
我们不难想到生产者发送消息时,可能会出现网络故障,导致与MQ的连接中断,这样也会导致消息的丢失。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
修改publisher模块的application.yaml文件,添加下面的内容:
1 | spring: |
也可以看见它的默认值和上面是一样的。
我们利用命令停掉RabbitMQ服务:
1 | docker stop mq |
然后测试发送一条消息,会发现会每隔1秒重试1次,(发送1s发现发送失败,再间隔1s重试,所以两次发送时间间隔2s)总共重试了3次。消息发送的超时重试机制配置成功了!
注意:
- 这个重试是连接时的重试,如果是在消息发送抛出异常时,他是不会充实的。
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
总结
为了保证消息在发送的过程中是稳定的,我们采用了生产者确认的方式,
所谓生产者确认就是使用confirm机制和return回退机制保证消息可以稳定到达队列当中;
confirm机制可以通过修改配置文件来生效,它会在消息成功投递到交换机之后返回ack来告知我们它是否到达交换机,如果没有则返回nack;
return回退机制是发生在消息成功投递到交换机之后,这时也会返回ack,但是他将消息路由到队列的过程中可能会失败,这时我们如果还配置了mandatory为ture,MQ就会将消息回调给生产者,否则会直接丢弃这条消息;
最后我们还可以通过配置confirmcallback和returncallback来设置我们在不同情况下该做什么。
消息持久化
为了保证消息在传输的过程中是稳定的,我们需要配置消息持久化,所谓持久化其实就是将MQ存储在内存中的消息写到磁盘上面。
内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞(mq内存满了之后,会做一个PageOut操作将一些较老的消息转移到磁盘上,这个期间,消息时阻塞的,不能执行其他操作)我们来验证一下,将一条消息发100w次:
1 |
|
可以看见速率出现了明显的低谷,pageout的消息数量也巨大
所以实现数据持久化是很有必要的,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
实现
队列和交换机持久化,都是将Durable设置为true即可;
消息持久化是在发送消息时附带值为Persistent的properties:
- 持久化的消息在mq重启后,消息依旧在
与上面非持久化的消息相比,没有了太大的速率下跌,没有pageout,但是似乎还是不太理想。
LazyQueue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
控制台配置
代码配置
- 添加
x-queue-mod=lazy参数也可设置队列为Lazy模式:
1 |
|
- 当然,我们也可以基于注解来声明队列并设置为Lazy模式:
1 |
|
测试一遍发现速率 基本一直处于峰值,而且没有在内存中(没有消费者接收这个消息)。
总结
为了保证消息的持久化,我们要将队列,交换机均设置为持久化的,发出的消息也要是持久化消息,这样就算mq挂了,消息被持久化到了硬盘上,也不会消失。
消费者的可靠性
同生产者一样,消费者也需要有确认机制来保证消息能被正常消费。
消费者确认
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack; - 如果是消息处理或校验异常,自动返回
reject;
- 如果是业务异常,会自动返回
实现
通过下面的配置可以修改SpringAMQP的ACK处理方式:
1 | spring: |
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:
1 |
|
测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
我们再次把确认机制修改为auto:
1 | spring: |
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):
- 消费者处理消息后可以向MQ发 送ack回执,MQ收到ack回执后才会删除该消息。
但是当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力,我们刚刚在验证auto模式时,就发现了一条消息也可以达到惊人的数据流量。
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
消费失败重试
修改consumer服务的application.yml文件,添加内容:
1 | spring: |
重启consumer服务,重复之前的测试。可以发现:
- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回reject,消息会被丢弃
异常交换机/队列
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
- 在consumer服务中定义处理失败消息的交换机和队列
1 |
|
- 定义一个RepublishMessageRecoverer,关联队列和交换机
1 |
|
- 运行
最后可以看见,在接收三次消息之后,它将消息发给了error.queue里面还附带了全部的异常信息
总结
为了保证消费者的可靠性,首先我们通过消费者确认机制,消费者返回ack时,mq将消息删除;
返回nack时,表示消息处理异常,为例减少服务器负担,我们在此基础上增加消费者重试机制,并声明异常交换机和异常处理器,将这些消息投递并路由到其中,最后由人工统一处理。
如何保证消息的可靠性?
消息从生产者一直到消费者被其消费的过程中,每个阶段都有可能导致消息的丢失,从而使业务异常,我们要从三个方面保证消息的可靠性:
- 要保证生产者的可靠性:首先要通过confirm机制和return回调机制来实现生产者确认功能;confirm机制会在消息成功投递到交换机之后,返回
ack,反之nack,来使消息投递到交换机的过程可感知化;但是消息成功投递到交换机中并不代表可以成功路由到队列上,所以还需要return回调机制来保证消息可以顺利路由到队列中;同时我们还可以定义confirmcallback和returncallback来设置不同情况下,我们应该做什么。 - 要使消息持久化:消息持久化需要交换机,队列和消息本身都经过持久化,这样才可以把消息存储在磁盘上,即使mq挂掉,消息也不会消失。
- 要保证消费者的可靠性:首先和生产者一样,我们使用消费者确认机制,在消息被成功消费之后返回
ack,来通知mq这条消息可以删除,反之返回nack;但是为了业务的完整性,我们又使用消费者重试机制,保证消息可以被正常消费,同时又不过多占用服务器性能;在重试过后,我们可以设置一对异常处理交换器和队列,专门将重试过后但又没有被消费的消息统一管理起来,最后通过人工处理。
至此,可以基本保证消息的可靠性。
