什么是死信
死信,顾名思义就是无法被消费的消息. 当一个队列中的消息满足下 列情况之一时,可以成为死信(dead letter):
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者不要了(消费者使用
basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false)。 - 消息过期了(超过了队列或消息本身设置的过期时间)。
- 投递的队列消息满了,最早的消息可能成为死信。
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange ——DLX)。
死信队列(Dead Letter Queue,简称 DLQ)是用于接收无法被正常消费的消息的一种特殊队列机制,RabbitMQ 提供死信交换机(DLX)来处理这些消息。
他们之间的关系是这样的:死信 ➝ DLX ➝ routingKey ➝ 死信队列(DLQ)
那他们有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 1,2点其实就是对消费者异常处理的兜底手段
- 收集因TTL(有效期)到期的消息(这个作用就可以让我们实现延迟消息的功能)
延迟消息
试想电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。
但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?
像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。
在RabbitMQ中实现延迟消息也有两种方案:
- 死信交换机+TTL(可以是队列的TTL也可以是消息本身的TTL)
- 延迟消息插件
DLX + TTL
我们先缕清一下思路
- 首先,生产者发送一个具有TTL(
expiration属性决定)的消息给一个simple.direct,它会将消息发送给由dead-letter-exchange属性指定了死信交换机的simple.queue。 - 这时消息因为没有消费者,所有会一直存在于
simple.queue,一旦到达TTL所限制的时间,它就会成为死信,从而进入死信交换机dlx.direct。 - 最后死信交换机将消息给
dlx.queue,然后由对应的消费者处理延迟消息。
我们编写好生产者和消费者的代码,验证一下
- 生产者
1 |
|
- 消费者
1 |
|
- 运行
可以看到,两次操作的时间正好是我们设置的10s,延迟消息实现成功。
- 其实还可以设置队列的TTL来控制消息的过期时间,与上雷同,我就不演示了。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog
安装
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
1 | docker volume inspect mq-plugins |
结果如下:
1 | [ |
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
1 | docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
即可
声明延迟交换机
- 注解的方式
多增加一个属性delayed,将其属性值设置为true
1 |
|
- 基于
@Bean的方式
1 |
|
创建一个延迟交换机试试
可以看到延迟转换器的参数和其他的是不同的。
发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
1 |
|
间隔5s,实现成功。
注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
总结
死心队列/交换机
首先,我们要知道死信是什么,顾名思义就是死掉的消息,它不会再被消费者消费,它会满足以下三点之一:1.被消费者拒绝;2.消息的TTL过期;3.投递的队列满了,最开始进队列的消息就有可能成为死信;
如果一个队列指定了deed-letter-exhange属性,它指定的这个交换机就成为死信交换机,死信交换机路由的队列就是死信队列。
死信队列可以处理消费失败的消息,也可以处理过期消息。
延迟队列
延迟队列就是指消息到达mq之后不会立即被消费,它会等到设置的TTL过期之后,消费者才能拿到这个消息进行消费。
比如订单的超时取消, 订单信息被放到mq中, 30分钟未支付订单就取消. 如果使用延时队列, 那监听mq的消费者从mq中直接拿到的就是,30分钟未支付订单的信息, 然后直接取消订单. 避免轮询数据库查找超时订单.
实现延迟队列
实现延迟队列我有两种策略
- 发送消息时,给
queue1中的消息设置TTL,然后通过交换机发送到queue1,queue1不绑定消费者,它的消息指向queue2,这样当TTL失效后,监听queue2的消费者就是接收的延迟消息;它的优点是RabbitMQ原生,缺点是不够精确,因为MQ会优先处理队头的消息。 - 使用DelayExchange插件,通过给设置
delay属性的方法声明延迟交换机,被其绑定的队列就是延迟队列,发送出来的带有TTL消息,被消费者消费时,就是延迟消息;它的优点是无需死信中转,精度更高,但是需要下载插件。