基本概念
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ。
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ
安装过程网上有很多,在此我就省略了。
其中包含几个概念:
publisher:生产者,也就是发送消息的一方
consumer:消费者,也就是消费消息的一方
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
整个过程梳理下来如下:
生产者[发送消息给交换机,交换机路由消息(并不是存储),然后决定将消息投递到某个或者某些队列],这时,队列存储这些消息,等待消费者处理,消费者根据自身的需求和能力选择处理消息。
在同一个mq集群下,可以有多个virtual host,他们之间互相隔离,有各自的exchange、queue。
发送消息
上面我们了解了基本过程,我们就来演示一下。
- 创建两个queue
如下:
- 操作交换机发出消息
我们将amq.fanout绑定我们刚刚创建的两个队列,并发送消息Hello,mq!。
消息已发送!
- 成功接收消息
也可以看见消息的具体内容,也可以发现这里的消息无法更改
数据隔离
上面所说同一个mq集群下,可以有多个virtual host,他们之间互相隔离,有各自的exchange、queue。
我们也可以进行验证。
- 创建一个
T00超级管理员
但是这个时候,我们无法操作,队列,交换机等,因为“没有我们名下”的虚拟主机。
- 创建虚拟主机
切换成我们新创建的用户,然后创建一个虚拟主机
他们两个虚拟主机的数据是相互隔离的
SpringAMQP
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
查看官方文档,java操作MQ的hello,world都及其复杂
Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
入门案例
- 引入相关依赖
使用SpringAMQP,我们要先引入相关依赖(这里是在父工程中引入的)
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
我们以它为父工程创建两个微服务模块来模拟消息的收发过程。
- 配置RabbitMQ服务端信息
为了方便测试,我们先新建一个simple.queue
先配置MQ地址,在publisher服务的application.yml中添加配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.100.128 port: 5672 virtual-host: /T00 username: TOO password: 101
|
- 发送消息
springAMQP中封装了一个RabbitTemplate工具类,我们可以直接使用工具类来发送消息。
1 2 3 4 5 6 7 8 9 10
| @Autowired private RabbitTemplate rabbitTemplate;
@Test void testSendMessage2Queue(){
String queueName = "simple.queue"; String msg = "helloe,mq!"; rabbitTemplate.convertAndSend(queueName,msg); }
|
直接运行,可以在控制台中找到发送的消息
- 接收消息
springAMQP中封装了一个注解,直接在注解中queues数组中指定需要监听的队列名字即可
1 2 3 4 5 6
| @RabbitListener(queues = "simple.queue") public void listenerSimpleQueue(String msg){ System.out.println("收到了消息:"+ msg); }
|
运行之后成功收到了消息
- 这里被注解的方法的参数类型应和生产者的一致,
@RabbitListener注解的作用就是将接收到的消息封装在方法的参数中。
WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息,每个消息只会被一个消费者处理一次,用来实现任务的“分工处理”。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
接下来,我们就来模拟这样的场景。
首先,我们在控制台创建一个新的队列,命名为work.queue:
RabbitMQ 有两种分发模式:
默认轮询发配
公平发配
我们首先模拟轮询发配
轮询发配
两个消费者性能相同
- 消息发送
1 2 3 4 5 6 7 8 9
| @Test void testWorkQueue() throws InterruptedException { String queueName = "work.queue"; for (int i = 1; i <= 50; i++) { String msg = "hello,worker!,message_" + i; rabbitTemplate.convertAndSend(queueName,msg); Thread.sleep(20); } }
|
- 消息接收
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "work.queue") public void listenerWorkQueue1(String msg){ System.out.println("消费者1 收到了消息:" + msg); }
@RabbitListener(queues = "work.queue") public void listenerWorkQueue2(String msg){ System.err.println("消费者1 收到了消息:" + msg); }
|
- 运行
可以看见,两个消费者如果性能一致时类似于轮询,一人消费一条消息,将消息均分了。
两个消费者性能不同
让两个消费者接收消息后停顿不同时间来实现模拟性能不同,消费者1每秒可以处理50条消息,而消费者2只能处理5条。
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "work.queue") public void listenerWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1 收到了消息:" + msg); Thread.sleep(20); }
@RabbitListener(queues = "work.queue") public void listenerWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2 收到了消息:" + msg); Thread.sleep(200); }
|
结果如下:
可以看见他们还是均分了50条消息,后面的时间全是性能比较差的消费者2在处理,性能好的消费者1反倒空闲了。
这样显然是不合理的,浪费了系统的性能,这时我们就应该使用公平分配模式。
公平分配
公平分配通俗来讲,就是能者多劳。
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
这样配置之后再次运行:
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
交换机
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
接下来,我们就来一一验证。
Fanout
- 准备工作
- 创建一个名为
T00.fanout的交换机,类型是Fanout
- 创建两个队列
fanout.queue1和fanout.queue2,绑定到交换机T00.fanout
过程和上面控制台发送消息一样,就略过了。
- 消息发送
fanout交换机没有这个key,传入一个空字符串即可:
1 2 3 4 5 6 7 8
| @Test public void testFanoutExchange() { String exchangeName = "T00.fanout"; String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
- 消息接收
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "fanout.queue1") public void listenerFanoutQueue1(String msg){ System.out.println("消费者1 收到了fanout.queue1消息:" + msg); }
@RabbitListener(queues = "fanout.queue2") public void listenerFanoutQueue2(String msg){ System.out.println("消费者2 收到了fanout.queue2消息:" + msg); }
|
- 运行
两者均接收到了消息,就如同广播一样,在广播的范围内,每个人都能收到消息。
Direct
在Fanout交换机中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
Direct交换机有以下特点:
队列与交换机的绑定:不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey
Exchange会根据绑定队列的Routingkey和所路由消息的Routingkey是否匹配,进行消息的发送,也就是是说只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
接下来我们进行演示:
- 准备工作
- 创建一个名为
T00.direct的交换机,类型是direct
- 再分别创建一个
Routing key为“red”,“blue”的direct.queue1和一个Routing key为“red”,“yellow”的direct.queue2
- 消息发送
1 2 3 4 5 6 7 8
| @Test public void testSendDirectExchange() { String exchangeName = "T00.direct"; String message = "红色"; rabbitTemplate.convertAndSend(exchangeName, "red", message); }
|
1 2 3 4 5 6 7 8
| @Test public void testSendDirectExchange() { String exchangeName = "T00.direct"; String message = "蓝色"; rabbitTemplate.convertAndSend(exchangeName, "bule", message); }
|
- 消息接收
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "direct.queue1") public void listenerDirectQueue1(String msg){ System.out.println("消费者1 收到了direct.queue1消息:" + msg); }
@RabbitListener(queues = "direct.queue2") public void listenerDirectQueue2(String msg){ System.out.println("消费者2 收到了direct.queue2消息:" + msg); }
|
- 运行
如果是red,他们都可以收到
但是如果是blue,就只有2才能收到
Topic
其实Topic与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,唯一的区别就是Topic可以让队列在绑定BindingKey 的时候使用通配符。
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
现在我们来实现一下:
- 准备工作
假如此时publisher发送的消息使用的RoutingKey共有四种:
china.news 代表有中国的新闻消息;
china.weather 代表中国的天气消息;
japan.news 则代表日本新闻
japan.weather 代表日本的天气消息;
解释:
topic.queue1绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
topic.queue2绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
- 发送消息
1 2 3 4 5 6 7 8
| @Test public void testSendTopicExchange() { String exchangeName = "T00.topic"; String message = "*****"; rabbitTemplate.convertAndSend(exchangeName, "***", message); }
|
- 接收消息
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "topic.queue1") public void listenerTopicQueue1(String msg){ System.out.println("消费者1 收到了topic.queue1消息:" + msg); }
@RabbitListener(queues = "topic.queue2") public void listenerTopicQueue2(String msg){ System.out.println("消费者2 收到了topic.queue2消息:" + msg); }
|
- 运行
routingKey为japan.news时,只有消费者2收到了消息
routingKey为china.news时,都收到了消息
routingKey为china.wether时,只有消费者1收到了消息
符合我们的推测。
声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
基本API
SpringAMQP提供了一个Queue类,用来创建队列:
SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:
我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:
而在绑定队列和交换机时,则需要使用BindingBuilder类来创建Binding对象:
我们有两种方法声明队列和交换机
new一个对应的类,在后面指出名字即可
调用builder函数
下面我们共同使用两种办法来声明交换机和队列以及他们的绑定关系。
fanout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Configuration public class FanoutConfiguration {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("TOO.fanout2"); }
@Bean public Queue fanoutQueue3(){ return new Queue("fanout.queue3"); }
@Bean public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange); }
@Bean public Queue fanoutQueue4(){ return new Queue("fanout.queue4"); }
@Bean public Binding bindingQueue2(Queue fanoutQueue4, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange); } }
|
direct
对于一个direct的交换机进行绑定操作会显得比较繁琐(因为builder函数一次只能设置一个key)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Configuration public class DirectConfiguration {
@Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange("TOO.direct").build(); }
@Bean public Queue directQueue1(){ return new Queue("direct.queue1"); }
@Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); }
@Bean public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); }
@Bean public Queue directQueue2(){ return new Queue("direct.queue2"); }
@Bean public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("red"); }
@Bean public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow"); } }
|
holy shit,这种声明方式好麻烦,因此伟大的springAMQP中还提供了一种注解声明队列和交换机的方法。
注解声明
可以在消费者注解中用bindings参数来声明队列,交换机和绑定关系,具体参数如下:
基本框架如下:
1 2 3 4 5
| @RabbitListener(bindings = @QueueBinding( value = @Queue(), exchange = @Exchange(), key = {} ))
|
我们现在删除以前于direct相关的交换机和队列,来试试用注解声明:
1 2 3 4 5 6 7 8
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1",durable = "true"), exchange = @Exchange(name = "T00.direct",type = ExchangeTypes.DIRECT), key = {"red" , "blue"} )) public void listenerDirectQueue1(String msg){ System.out.println("消费者1 收到了direct.queue1消息:" + msg); }
|
1 2 3 4 5 6 7 8
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2",durable = "true"), exchange = @Exchange(name = "T00.direct",type = ExchangeTypes.DIRECT), key = {"red" , "yellow"} )) public void listenerDirectQueue2(String msg){ System.out.println("消费者2 收到了direct.queue2消息:" + msg); }
|
运行之后可以看见成功的创建了。
消息转换器
前面我们说到生产者发送消息的类型最好与消费者参数的类型相同,但是那是接收消息,那发送消息是否有什么限定呢?
默认消息转换器
我们先来试试它默认的消息转换器
- 创建一个
object.queue队列用于接收
- 编写一个
testSendObject()用于发送消息(json格式)
1 2 3 4 5 6 7
| @Test public void testSendObject() { Map<String,Object> msg = new HashMap<>(); msg.put("name","jack"); msg.put("age",18); rabbitTemplate.convertAndSend("object.queue",msg); }
|
在发送消息过后,我们来到mq的控制台查看接收到的消息:
它并不是json格式的信息,反而是一堆看不懂的东西,这是为什么呢?
因为它默认的消息转换器是将消息jdk序列化,jdk序列化存在安全漏洞,可读性比较差(如果以后发送的消息是一个体积很大,很复杂的json,jdk序列化的消息很难维护)的缺点,那我们有没有什么方法可以解决呢?
答案就是我们自己配置消息转换器来替换默认的转换器。
配置JSON转换器
- 在
publisher和consumer两个服务中都引入依赖(父工程中引入即可):
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
- 配置消息转换器:
在publisher和consumer两个服务的启动类中添加一个Bean即可:
1 2 3 4
| @Bean public MessageConverter jacksonmessageConverter(){ return new Jackson2JsonMessageConverter(); }
|
再次发送,成功转换了