异步通讯组件MQ
RabbitMQ
高性能的异步通讯组件
1. 同步异步和MQ技术选型
MQ:消息队列,存放消息的队列,也就是异步调用中的Broker
1.1 同步调用
微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。
这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。
1.2 异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。
这样,发送消息的人和接收消息的人就完全解耦了。
1.3 MQ技术选型
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2. RabbitMQ
RabbitMQ的整体架构及核心概念
-
virtual-host:虚拟主机,起到数据隔离的作用
-
publisher:消息发送者
-
consumer:消息的消费者
-
queue:队列,存储消息
-
exchange:交换机,负责路由消息
2.1 入门
消息发送的注意事项
- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
3. java客户端
3.1 快速入门
-
引入spring-boot-starter-amqp依赖
-
进行地址等相关配置
-
发送消息:利用
RabbitTemplate
实现消息发送:rabbitTemplate.convertAndSend(queueName,message);
-
接收消息:利用@RabbitListener注解声明要监听的队列,监听消息
SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
@RabbitListener(queues = "simple.queue")
3.2 WorkQueues
-
Work Queueus,任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息,可以加快消息处理速度
-
同一条消息只会被一个消费者处理
-
在默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积
**解决方法:**因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息
3.3 交换机
交换机的主要作用是
- 接收发送者发送的消息,
- 将消息路由到与其绑定的队列
常见交换机的类型有:
- Fanout:广播
- Direct:定向
- Topic:话题
Fanout交换机
会将接收的消息路由到每一个与其绑定的queue,所以也叫广播模式
三个参数:交换机名,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName,null,message);
Direct交换机
Direct交换机会将接收到的消息根据规则路由到指定的队列中,也成为定向路由
- 队列与交换机的绑定设置一个BindingKey
- 发布者发送消息时,指定消息的
RoutingKey
。 - Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
多个队列具有相同的RoutingKey则和Fanout功能类似
Topic交换机
Topic交换机也是基于RoutingKey做消息路由的,但是routingKey通常时多个单词的组合,并以.进行分割
Topich和Direct交换机的差异?
Topic的routingKey和bindingKey可以是多个单词,以.分割,
Topic的交换机与队列绑定时的bindingKey可以指定通配符
#表示0个或多个单词,*表示一个单词
3.6 声明队列交换机
基于代码生成交换机
SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系
- Queue:用来声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binging:用于声明队列和交换机的绑定关系,可以用工厂类BingingBuilder构建
一般这些声明写在消费者中,发送者不需要关心声明,
fanout交换机的操作:
// 交换机的声明@Beanpublic FanoutExchange fanoutExchange(){
// return new FanoutExchange("hamll.fanout");return ExchangeBuilder.fanoutExchange("hmall.fanout").build();}// 队列的声明@Beanpublic Queue fanoutQueue1(){
// return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue1").build();}// 绑定关系的声明@Beanpublic Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
但是direct交换机的绑定存在问题,如下
// 交换机的声明@Beanpublic DirectExchange directExchange(){
// return new FanoutExchange("hamll.fanout");return ExchangeBuilder.directExchange("hamll.direct").build();}// 队列的声明@Beanpublic Queue directQueue1(){
// return new Queue("fanout.queue1");return QueueBuilder.durable("direct.queue1").build();}// 绑定关系的声明@Beanpublic Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}// 绑定关系的声明@Beanpublic Binding directQueue1BindingYellow(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("yellow");}
由于一次只能绑定一个关键词,所以当需要绑定的bindingKey越多,Bean也就越多,代码臃肿
解决方案:使用基于@RabbitListener注解来声明队列和交换机发方法
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), // 队列 exchange = @Exchange(name = "hmall.direct"), // 交换机key ={"red","blue"} // bindingKey))public void ListenDirectQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到消息:" + message+","+ LocalTime.now());}
3.7 消息转换器
建议使用JSON序列化代替默认的JDK虚拟化
- 赢入jackson依赖
- 在publisher和consumer中都配置MessageConverter
4. 黑马商城业务改造
需求:改造余额支付功能,不再同步调用交易服务的OpenFeign接口进行远程调用,而是采用异步MQ通知交易服务更新订单状态
- 对于边缘业务,不做远程调用,而是做消息通知(异步通知)