消息的可靠性
T00 Lv2

消息丢失

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

image

消息从生产者到消费者的每一步都可能导致消息丢失:

  1. 发送消息时消息丢失
    • 生产者发送消息到交换机时
    • 交换机把消息发送给队列时
  2. MQ导致消息丢失
    • 消息没来得及发送给消费者就消失了
  3. 消费者拿到消息后未消费导致消息丢失
    • 消费者拿到消息没来得及消费就消失了

消息的可靠性

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

生产者的可靠性

生产者确认

生产者在向交换机传输消息时,有可能消息根本无法到达交换机,或者到达交换机后没有成功的将消息传递给队列,所以MQ针对这两点分别做出了保障,我们称之为生产者的确认。

Confirm模式:确保消息送达交换机

当消息成功到达 RabbitMQ 的交换机(Exchange)时,RabbitMQ 就会返回 ack 投递失败返回nack

想要开启confirm模式也很简单,在publisher模块的application.yaml中添加配置:

1
2
3
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执(等信息返回了再执行其他操作)
  • correlated:MQ异步回调返回回执(发完消息直接做其他事情,有消息回调回来再进行处理)

Return回退机制:确保消息路由成功

在消息到达Exchange过后,MQ会返回ack,但是这并不代表消息就可以成功发送给队列,因此还需要一层return机制来保证:确保消息成功从交换机路由到目标队列,未路由的消息能被回退感知。

想要开启confirm模式也很简单,在publisher模块的application.yaml中添加配置:

1
2
3
4
5
6
spring:
rabbitmq:
publisher-confirm-type: correlated # 启用 confirm 机制
publisher-returns: true # 启用 return 回调(还需 mandatory 才生效)
template:
mandatory: true # 配合 return 生效,消息未路由才触发回调,默认false会在失败后直接丢弃消息

生产者确认机制的实现

经过梳理整个生产者确认机制如下图:

image

我们现在来实现一下:

  1. 配置publisher模块的application.yaml
1
2
3
4
5
6
spring:
rabbitmq:
publisher-confirm-type: correlated # 启用 confirm 机制
publisher-returns: true # 启用 return 回调(还需 mandatory 才生效)
template:
mandatory: true # 配合 return 生效,消息未路由才触发回调
  1. 配置ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

image

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image
  1. 配置ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置,是一种初始化的操作,我们可以使用@PostConstruct来实现。

@PostConstruct 是 Java 中的一个注解,常用于标记方法,使其在对象实例化并完成依赖注入后立即执行。它通常用于初始化逻辑,例如加载配置、建立连接等。)

我们建立一个MqReturnConfig配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@AllArgsConstructor
@Configuration
public class MqReturnConfig {

private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
  1. 发送消息及配置confirm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
void testConfigCallback() throws InterruptedException {
//1.创建cd
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
//2.添加confirm callback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回执");
if(result.isAck()){
// 消息发送成功
log.debug("消息发送成功,收到ack");
}else {
log.error("消息发送失败,收到nack,原因:{}" ,result.getReason());
}
}

@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败",ex);
}
});

rabbitTemplate.convertAndSend("T00.direct","red","hello",cd);

Thread.sleep(2000); //因为需要等待消息发出才能得到回调
}
  1. 运行

    1. 所有信息均正确时,返回了ack:
    image
    1. 当路由key不正确时,也返回了ack并且返回了我们编写的ReturnCallback配置类(到达交换机,但没有成功路由):
    image
    1. 当无法连接到时,也就是没有正常到达交换机时,返回了nack,同时给出了原因:
    image

连接可靠性:生产者自动重连机制

我们不难想到生产者发送消息时,可能会出现网络故障,导致与MQ的连接中断,这样也会导致消息的丢失。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

修改publisher模块的application.yaml文件,添加下面的内容:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

也可以看见它的默认值和上面是一样的。

image

我们利用命令停掉RabbitMQ服务:

1
docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,(发送1s发现发送失败,再间隔1s重试,所以两次发送时间间隔2s)总共重试了3次。消息发送的超时重试机制配置成功了!

image

注意:

  • 这个重试是连接时的重试,如果是在消息发送抛出异常时,他是不会充实的。
  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

总结

为了保证消息在发送的过程中是稳定的,我们采用了生产者确认的方式,

所谓生产者确认就是使用confirm机制和return回退机制保证消息可以稳定到达队列当中;

confirm机制可以通过修改配置文件来生效,它会在消息成功投递到交换机之后返回ack来告知我们它是否到达交换机,如果没有则返回nack

return回退机制是发生在消息成功投递到交换机之后,这时也会返回ack,但是他将消息路由到队列的过程中可能会失败,这时我们如果还配置了mandatory为ture,MQ就会将消息回调给生产者,否则会直接丢弃这条消息;

最后我们还可以通过配置confirmcallback和returncallback来设置我们在不同情况下该做什么。


消息持久化

为了保证消息在传输的过程中是稳定的,我们需要配置消息持久化,所谓持久化其实就是将MQ存储在内存中的消息写到磁盘上面。

内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞(mq内存满了之后,会做一个PageOut操作将一些较老的消息转移到磁盘上,这个期间,消息时阻塞的,不能执行其他操作)我们来验证一下,将一条消息发100w次:

1
2
3
4
5
6
7
8
9
@Test
void testPageOut(){
Message msg = MessageBuilder
.withBody("101".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend("simple.queue",msg);
}

可以看见速率出现了明显的低谷,pageout的消息数量也巨大

image

所以实现数据持久化是很有必要的,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

实现

队列和交换机持久化,都是将Durable设置为true即可;

消息持久化是在发送消息时附带值为Persistentproperties

image
  • 持久化的消息在mq重启后,消息依旧在

与上面非持久化的消息相比,没有了太大的速率下跌,没有pageout,但是似乎还是不太理想。

image

LazyQueue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

控制台配置

代码配置

  1. 添加x-queue-mod=lazy参数也可设置队列为Lazy模式:
1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
  1. 当然,我们也可以基于注解来声明队列并设置为Lazy模式:
1
2
3
4
5
6
7
8
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
....
}

测试一遍发现速率 基本一直处于峰值,而且没有在内存中(没有消费者接收这个消息)。

image

总结

为了保证消息的持久化,我们要将队列,交换机均设置为持久化的,发出的消息也要是持久化消息,这样就算mq挂了,消息被持久化到了硬盘上,也不会消失。


消费者的可靠性

同生产者一样,消费者也需要有确认机制来保证消息能被正常消费。

消费者确认

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

实现

通过下面的配置可以修改SpringAMQP的ACK处理方式:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

1
2
3
4
5
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("消费者接收到消息:" + msg );
throw new MessageConversionException("故意的");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

我们再次把确认机制修改为auto:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

image
  • 消费者处理消息后可以向MQ发 送ack回执,MQ收到ack回执后才会删除该消息。

但是当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力,我们刚刚在验证auto模式时,就发现了一条消息也可以达到惊人的数据流量。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

消费失败重试

修改consumer服务的application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

异常交换机/队列

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

image
  1. 在consumer服务中定义处理失败消息的交换机和队列
1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
  1. 定义一个RepublishMessageRecoverer,关联队列和交换机
1
2
3
4
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
  1. 运行

最后可以看见,在接收三次消息之后,它将消息发给了error.queue里面还附带了全部的异常信息

image image

总结

为了保证消费者的可靠性,首先我们通过消费者确认机制,消费者返回ack时,mq将消息删除;

返回nack时,表示消息处理异常,为例减少服务器负担,我们在此基础上增加消费者重试机制,并声明异常交换机和异常处理器,将这些消息投递并路由到其中,最后由人工统一处理。


如何保证消息的可靠性?

消息从生产者一直到消费者被其消费的过程中,每个阶段都有可能导致消息的丢失,从而使业务异常,我们要从三个方面保证消息的可靠性:

  1. 要保证生产者的可靠性:首先要通过confirm机制和return回调机制来实现生产者确认功能;confirm机制会在消息成功投递到交换机之后,返回ack,反之nack,来使消息投递到交换机的过程可感知化;但是消息成功投递到交换机中并不代表可以成功路由到队列上,所以还需要return回调机制来保证消息可以顺利路由到队列中;同时我们还可以定义confirmcallback和returncallback来设置不同情况下,我们应该做什么。
  2. 要使消息持久化:消息持久化需要交换机,队列和消息本身都经过持久化,这样才可以把消息存储在磁盘上,即使mq挂掉,消息也不会消失。
  3. 要保证消费者的可靠性:首先和生产者一样,我们使用消费者确认机制,在消息被成功消费之后返回ack,来通知mq这条消息可以删除,反之返回nack;但是为了业务的完整性,我们又使用消费者重试机制,保证消息可以被正常消费,同时又不过多占用服务器性能;在重试过后,我们可以设置一对异常处理交换器和队列,专门将重试过后但又没有被消费的消息统一管理起来,最后通过人工处理。

至此,可以基本保证消息的可靠性。

Powered by Hexo & Theme Keep
Total words 55.8k Unique Visitor Page View