死信以及延迟消息
T00 Lv2

什么是死信

死信,顾名思义就是无法被消费的消息. 当一个队列中的消息满足下 列情况之一时,可以成为死信(dead letter):

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  1. 消费者不要了(消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false)。
  2. 消息过期了(超过了队列或消息本身设置的过期时间)。
  3. 投递的队列消息满了,最早的消息可能成为死信。

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange ——DLX)。

死信队列(Dead Letter Queue,简称 DLQ)是用于接收无法被正常消费的消息的一种特殊队列机制,RabbitMQ 提供死信交换机(DLX)来处理这些消息。

他们之间的关系是这样的:死信 ➝ DLX ➝ routingKey ➝ 死信队列(DLQ)

那他们有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
    • 1,2点其实就是对消费者异常处理的兜底手段
  3. 收集因TTL(有效期)到期的消息(这个作用就可以让我们实现延迟消息的功能)

延迟消息

试想电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL(可以是队列的TTL也可以是消息本身的TTL)
  • 延迟消息插件

DLX + TTL

我们先缕清一下思路

image
  1. 首先,生产者发送一个具有TTLexpiration属性决定)的消息给一个simple.direct,它会将消息发送给由dead-letter-exchange属性指定了死信交换机的simple.queue
  2. 这时消息因为没有消费者,所有会一直存在于simple.queue,一旦到达TTL所限制的时间,它就会成为死信,从而进入死信交换机dlx.direct
  3. 最后死信交换机将消息给dlx.queue,然后由对应的消费者处理延迟消息

我们编写好生产者和消费者的代码,验证一下

  1. 生产者
1
2
3
4
5
6
7
8
9
@Test
void testSendTTLMessage2Queue(){
Message msg = MessageBuilder
.withBody("101".getBytes(StandardCharsets.UTF_8))
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend("simple.direct","",msg);
log.info("我已发送消息");
}
  1. 消费者
1
2
3
4
@RabbitListener(queues = "dlx.queue")
public void listenerDlxQueue(String msg){
log.info("延时消费者接收到消息:" + msg );
}
  1. 运行
image

image可以看到,两次操作的时间正好是我们设置的10s,延迟消息实现成功。

  • 其实还可以设置队列的TTL来控制消息的过期时间,与上雷同,我就不演示了。

DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog

安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

1
docker volume inspect mq-plugins

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
image

即可


声明延迟交换机

  1. 注解的方式

多增加一个属性delayed,将其属性值设置为true

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
  1. 基于@Bean的方式
1
2
3
4
5
6
7
8
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}

创建一个延迟交换机试试

可以看到延迟转换器的参数和其他的是不同的。

image

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
image image

间隔5s,实现成功。

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息


总结

死心队列/交换机

首先,我们要知道死信是什么,顾名思义就是死掉的消息,它不会再被消费者消费,它会满足以下三点之一:1.被消费者拒绝;2.消息的TTL过期;3.投递的队列满了,最开始进队列的消息就有可能成为死信;

如果一个队列指定了deed-letter-exhange属性,它指定的这个交换机就成为死信交换机,死信交换机路由的队列就是死信队列

死信队列可以处理消费失败的消息,也可以处理过期消息。

延迟队列

延迟队列就是指消息到达mq之后不会立即被消费,它会等到设置的TTL过期之后,消费者才能拿到这个消息进行消费。

比如订单的超时取消, 订单信息被放到mq中, 30分钟未支付订单就取消. 如果使用延时队列, 那监听mq的消费者从mq中直接拿到的就是,30分钟未支付订单的信息, 然后直接取消订单. 避免轮询数据库查找超时订单.

实现延迟队列

实现延迟队列我有两种策略

  1. 发送消息时,给queue1中的消息设置TTL,然后通过交换机发送到queue1,queue1不绑定消费者,它的消息指向queue2,这样当TTL失效后,监听queue2的消费者就是接收的延迟消息;它的优点是RabbitMQ原生,缺点是不够精确,因为MQ会优先处理队头的消息。
  2. 使用DelayExchange插件,通过给设置delay属性的方法声明延迟交换机,被其绑定的队列就是延迟队列,发送出来的带有TTL消息,被消费者消费时,就是延迟消息;它的优点是无需死信中转精度更高,但是需要下载插件。
Powered by Hexo & Theme Keep
Total words 55.8k Unique Visitor Page View