当前位置: 首页 > news >正文

RabbitMQ03——面试题

目录

一、mq的作用和使用场景

二、mq的优点

2.1架构设计优势

2.2功能特性优势

 2.3性能与可靠性优势

2.4生态系统优势

2.5对比优势

三、mq的缺点

3.1性能与扩展性限制

3.2功能局限性

3.3运维复杂度

3.4与其他消息队列的对比劣势

四、mq相关产品,每种产品的特点

4.1开源产品

4.2 商业/云产品

五、rabbitmq的搭建过程

六、rabbitmq相关角色

七、rabbitmq内部组件

八、生产者发送消息的过程?

九、消费者接收消息过程?

十、springboot项目中如何使用mq?

十一、如何保障消息不丢失?

十二、死信交换机和死信队列

RabbitMQ规定消息符合以下某种情况时,将会成为死信

十三、延迟队列简介


一、mq的作用和使用场景

作用:

RabbitMQ 是一个开源的消息代理和队列服务器,主要用于实现应用程序之间的异步通信和解耦。它的主要作用包括:

  1. 应用解耦:将相互依赖的应用系统分离,降低系统间的耦合度

  2. 异步通信:实现系统间的非实时、异步消息传递

  3. 流量削峰:缓冲突发流量,避免系统被压垮

  4. 消息分发:支持多种消息路由模式(点对点、发布/订阅等)

  5. 可靠传递:提供消息持久化、确认机制等保证消息可靠传输

使用场景:

使用场景场景示例优势
异步任务处理用户注册后发送欢迎邮件/短信主流程快速响应,耗时操作异步执行
订单支付成功后通知物流系统
图片/视频上传后的处理任务
应用解耦电商系统中订单系统与库存系统分离系统可独立开发、部署和扩展
主业务系统与数据分析系统解耦
流量削峰秒杀/抢购活动中的请求排队平滑流量波动,保护后端系统
突发性大流量日志收集
分布式系统通信微服务间的事件通知提供标准协议支持多语言客户端(如Java、Python等)
跨语言系统间的数据交换
数据同步主数据库变更同步到缓存可靠的消息传递机制
多个系统间的数据一致性保证

1、抢购活动,削峰填谷,防止系统崩塌。

2、延迟信息处理,比如 10 分钟之后给下单未付款的用户发送邮件提醒。

3、解耦系统,对于新增的功能可以单独写模块扩展,比如用户确认评价之后,新增了给用户返积分的功能,这个时候不用在业务代码里添加新增积分的功能,只需要把新增积分的接口订阅确认评价的消息队列即可,后面再添加任何功能只需要订阅对应的消息队列即可。

二、mq的优点

2.1架构设计优势

优点说明
松耦合设计生产者和消费者无需相互感知,通过消息队列实现解耦
异步通信支持非阻塞式消息传递,提高系统响应速度
缓冲能力有效应对流量洪峰,实现平滑的系统负载
扩展性强可通过集群轻松扩展处理能力

2.2功能特性优势

优点技术实现
多协议支持原生支持AMQP 0-9-1,同时支持STOMP、MQTT等协议
灵活路由提供4种交换机类型(Direct/Topic/Fanout/Headers)实现复杂消息路由
消息可靠性支持持久化、生产者确认、消费者ACK机制
高可用性支持镜像队列、集群部署,保证服务连续性
管理便捷提供Web管理界面和HTTP API,支持监控和运维

 2.3性能与可靠性优势

  1. 消息持久化

    • 支持将消息和队列持久化到磁盘

    • 即使服务器重启也能保证消息不丢失

  2. 集群支持

    • 支持多节点集群部署

    • 镜像队列可自动同步消息

  3. 灵活的消息确认机制

    // 消费者确认模式选择
    channel.basicConsume(queue, autoAck, consumer);
  • 自动确认(autoAck=true)

  • 手动确认(autoAck=false)

2.4生态系统优势

方面支持情况
多语言客户端支持Java/.NET/Python/Ruby/Go等几乎所有主流语言
社区支持活跃的开源社区,丰富的文档和解决方案
云平台集成完美支持Kubernetes,各大云平台提供托管服务
插件体系支持多种官方/社区插件(如延迟消息、MQTT协议等)

2.5对比优势

特性RabbitMQKafkaActiveMQ
消息延迟支持(需插件)不支持支持
吞吐量万级QPS百万级QPS万级QPS
协议支持多协议自定义协议多协议
运维复杂度

三、mq的缺点

3.1性能与扩展性限制

缺点具体表现对比数据
吞吐量有限单节点最佳吞吐约5万-10万msg/s,低于Kafka等分布式队列Kafka单节点可达百万级msg/s
队列数量限制单个集群建议不超过5万个队列,过多队列会导致性能下降Kafka的topic分区数可轻松过万
消息堆积能力弱内存受限,默认当内存使用超过40%会阻塞生产者Kafka依赖磁盘存储,堆积能力更强

3.2功能局限性

缺点具体表现解决方案
消息顺序不严格在多消费者场景下无法保证全局消息顺序需单消费者+单队列实现有限顺序保证
延迟消息实现复杂原生不支持延迟队列,需通过TTL+死信队列间接实现安装rabbitmq_delayed_message_exchange插件
事务性能差AMQP事务性能低下,提交事务的吞吐量下降10倍以上使用Confirm模式替代

3.3运维复杂度

缺点具体表现影响范围
集群管理复杂镜像队列配置繁琐,节点增减需要手动维护增加运维成本
内存管理敏感内存使用超过阈值(默认40%)会阻塞生产者,需精细调优生产环境需设置监控告警
跨机房同步困难原生不支持多机房同步,需借助Shovel或Federation插件异地容灾实现成本高

3.4与其他消息队列的对比劣势

功能维度RabbitMQKafkaRocketMQPulsar
消息堆积能力内存受限磁盘存储,极强磁盘存储,强分层存储,最强
严格顺序仅单队列保证分区内严格有序分区严格有序分区严格有序
横向扩展队列粒度扩展困难分区自动平衡自动负载均衡自动扩展
延迟消息需插件支持不支持原生支持原生支持
事务支持性能差支持支持支持

四、mq相关产品,每种产品的特点

4.1开源产品

1. RabbitMQ

特点说明
协议支持支持AMQP、MQTT、STOMP等多种协议
消息路由提供4种交换机类型,路由策略最灵活
可靠性支持持久化、确认机制、事务(性能低)
管理界面提供功能完善的Web管理控制台
适用场景企业级应用、需要复杂路由的中等规模消息处理

2. Apache Kafka

特点说明
高吞吐设计用于日志处理,单机可达百万级TPS
持久化消息持久化到磁盘,支持长期保存
分区顺序保证分区内消息严格有序
流处理与Kafka Streams深度集成
适用场景日志收集、流数据处理、大数据管道

3. Apache RocketMQ

特点说明
事务消息提供完整的事务消息解决方案
延迟消息原生支持18个级别的延迟消息
消息轨迹可追踪消息全生命周期
双主双从高可用架构设计
适用场景金融支付、电商交易等对一致性要求高的场景

4. Apache Pulsar

特点说明
分层架构计算与存储分离,支持无限扩展
多租户原生支持多租户隔离
消息模型统一队列和流处理模型
地理复制内置多机房同步机制
适用场景云原生环境、多租户SaaS应用、全球分布式系统

5. ActiveMQ

特点说明
JMS支持完整实现JMS 1.1规范
协议支持支持OpenWire、STOMP、AMQP等
嵌入式可作为嵌入式消息系统使用
适用场景传统Java EE应用、需要JMS规范的项目

4.2 商业/云产品

1. AWS SQS/SNS

特点说明
全托管无需运维基础设施
弹性扩展自动扩展处理能力
与AWS集成深度集成Lambda、EC2等服务
适用场景运行在AWS上的应用,需要轻量级消息服务

2. Azure Service Bus

特点说明
企业级特性支持会话、死信队列、计划消息等
高级消息模式提供主题、队列和中继三种模式
Geo-DR内置异地灾难恢复
适用场景企业级应用集成,特别是微软技术栈项目

3. Google Cloud Pub/Sub

特点说明
全球消息传递消息可在全球任何区域生产和消费
强一致性保证至少一次投递,支持精确一次处理
实时分析集成无缝对接BigQuery等数据分析服务
适用场景全球分布式系统、实时分析管道

4. Alibaba Cloud MQ

特点说明
多协议支持兼容RabbitMQ、RocketMQ和Kafka协议
金融级可靠提供金融级消息可靠性保证
全链路追踪支持消息生产、存储、消费全链路追踪
适用场景阿里云上的金融、电商等关键业务系统
  1. 需要复杂路由 → RabbitMQ

  2. 超高吞吐日志 → Kafka

  3. 金融交易场景 → RocketMQ

  4. 云原生/多租户 → Pulsar

  5. 全托管服务 → AWS SQS/Azure Service Bus

  6. 传统Java EE → ActiveMQ

五、rabbitmq的搭建过程

docker下安装配置rabbitmq
1、拉取镜像

docker pull rabbitmq:latest
​
docker pull rabbitmq:3.9.0-management


2、创建并启动容器

docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq  rabbitmq:3.9.0-management



3、查看容器状态

docker ps

如果容器状态显示为Up,并且端口映射正确,那么RabbitMQ服务已经成功启动。

4、查看容器日志

docker logs rabbitmq

访问RabbitMQ管理界面

5、创建用户赋权

# 进入容器
docker exec -it rabbitmq bash#启用 RabbitMQ 的管理控制台插件
rabbitmq-plugins enable rabbitmq_management
​
# 在容器内执行
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

6、使用浏览器打开RabbitMQ管理界面。
默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL:

http://<宿主机IP地址>:15672

六、rabbitmq相关角色

RabbitMQ 中重要的角色有:生产者、消费者和代理:

生产者:消息的创建者,负责创建和推送数据到消息服务器;

消费者:消息的接收方,用于处理数据和确认消息;

代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

七、rabbitmq内部组件

1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交换器):用于接受、分配消息。
4、Queue(队列):用于存储生产者的消息。
5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。
6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。

八、生产者发送消息的过程?

发送消息的核心步骤

  1. 建立连接

  2. 创建通道

  3. 声明交换机/队列(可选)

  4. 发布消息

  5. 处理确认(可选)

  6. 关闭连接

首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 RabbitMQ 就创建了一条 amqp 信道(channel),信道是创建在“真实” tcp 上的虚拟连接,amqp 命令都是通过信道发送出去的,每个信道都会有一个唯一的 id,不论是发布消息,订阅队列都是通过这个信道完成的。

九、消费者接收消息过程?

  1. 建立连接和通道

  2. 声明队列(可选)

  3. 设置QoS(服务质量控制)

  4. 注册消费者

  5. 消息处理

  6. 发送确认(ACK/NACK)

  7. 处理失败消息

十、springboot项目中如何使用mq?

3.1引入jar包
spring-boot-starter-amqp.jar

spring-boot-starter-web.jar

3.2application.yml配置

连接rabbitmq下相关信息
host
port
username
password
virtualHost

3.3使用rabbitmq模版工具类 rabbitTemplate amqpTemplate

发送消息方法
send

接收消息      注解controller控制层类上,添加注解可以实时接收消息@RabbitListener
receive

3.4细化

项目启动时,自动创建交换机、创建队列、指定交换机和队列的绑定

十一、如何保障消息不丢失?

1、发送阶段:发送阶段保障消息到达交换机 事务机制|confirm确认机制

2、存储阶段:持久化机制 交换机持久化、队列的持久化、消息内容的持久化

3、消费阶段:消息的确认机制 自动ack|手动ack

接收方消息确认机制

自动ack|手动ack

spring:rabbitmq:host: 主机号port: 5672username: adminpassword: 123456virtual-host: /yan3listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual
package com.hl.rabbitmq01.web;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import java.io.IOException;
​
@RestController
@RequestMapping("/c")
public class ConsumerController {
​@RabbitListener(queues = {"topicQueue01"})public void receive(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println(msg);//业务逻辑 比如传入订单id,根据订单id,减少库存、支付等,// 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息if(msg.length() >= 5){//确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//拒绝消息 not ack// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}
​
​}
}

消息的持久化机制

交换机的持久化

队列的持久化

消息内容的持久化

package com.hl.rabbitmq01.direct;
​
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/*
生产者  javaSE方式简单测试
发布订阅-------direct模型
生产者----消息队列----消费者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、创建连接Connection connection = MQUtil.getConnection();//2、基于连接,创建信道Channel channel = connection.createChannel();//3、基于信道,创建队列/*参数:1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建2. durable:是否持久化,当mq重启之后,消息还在3. exclusive:* 是否独占。只能有一个消费者监听这队列4。当Connection关闭时,是否删除队列autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*/channel.queueDeclare("directQueue01", true, false, false, null);channel.queueDeclare("directQueue02", false, false, false, null);/*声明交换机参数1:交换机名称参数2:交换机类型*/channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);/*绑定交换机和队列参数1:队列名参数2:交换机名称参数3:路由key 广播模型 不支持路由key  ""*/channel.queueBind("directQueue01","directExchange01","error");channel.queueBind("directQueue02","directExchange01","error");channel.queueBind("directQueue02","directExchange01","info");channel.queueBind("directQueue02","directExchange01","trace");//发送消息到消息队列/*参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称,简单模式下路由名称使用消息队列名称3. props:配置信息4. body:发送消息数据*/
​channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
​
​//4、关闭信道,断开连接channel.close();connection.close();}
}
package com.hl.rabbitmq01.web;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
​
import java.io.IOException;
import java.nio.charset.StandardCharsets;
​
@RestController
@RequestMapping("/p")
public class ProducerController {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;
​
​@RequestMapping("/send")public void send(@RequestParam(defaultValue = "user") String key,@RequestParam(defaultValue = "hello") String msg) throws IOException {//amqpTemplate.convertAndSend("topicExchange", key, msg);
//        rabbitTemplate.convertAndSend("topicExchange",key,msg);Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false); //false 非事务模式运行 无需手动提交channel.basicPublish("topicExchange", key,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}
}
​
/*
创建交换机*/
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topicExchange").durable(true)  //是否支持持久化机制.build();
}
/*
创建队列*/
@Bean
public Queue queue(){return QueueBuilder.durable("topicQueue01").build();
}

发送方的消息确认机制

1、事务机制

消耗资源

RabbitMQ中与事务有关的主要有三个方法:

  • txSelect() 开始事务

  • txCommit() 提交事务

  • txRollback() 回滚事务

txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。

当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。

示例

@RestController
public class RabbitMQController {
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@RequestMapping("/send")public String sendMessage(String message){rabbitTemplate.setChannelTransacted(true); //开启事务操作rabbitTemplate.execute(channel -> {try {channel.txSelect();//开启事务
​channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
​int i = 5/0;
​channel.txCommit();//没有问题提交事务}catch (Exception e){e.printStackTrace();channel.txRollback();//有问题回滚事务}
​return null;});
​return "success";}
​
}

消费者没有任何变化。

通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。

2、confirm确认机制

推荐

同步通知

channel.confirmSelect(); //开始confirm操作
​
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
​
if (channel.waitForConfirms()){System.out.println("发送成功");
}else{//进行消息重发System.out.println("消息发送失败,进行消息重发");
}

异步通知

channel.confirmSelect();
​
channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker,就会发送一条ack消息@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("发送消息成功");}
​//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("发送消息失败,重新发送消息");}
});
​
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());

十二、死信交换机和死信队列

在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。

死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。

死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机

RabbitMQ规定消息符合以下某种情况时,将会成为死信

  • 队列消息长度到达限制(队列消息个数限制);

  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  • 原队列存在消息过期设置,消息到达超时时间未被消费;

死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。

 Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","deadExchange");//当前队列和死信交换机绑定map.put("x-dead-letter-routing-key","user.#");//当前队列和死信交换机绑定的路由规则
//        map.put("x-max-length",2);//队列长度map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
​
//        return QueueBuilder.durable("topicQueue01").build();return QueueBuilder.durable("topicQueue").withArguments(map).build();

十三、延迟队列简介

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现

ttl+死信队列

ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。

延迟插件

人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

http://www.lryc.cn/news/596691.html

相关文章:

  • 为什么要微调大语言模型
  • 论文笔记 | Beyond Pick-and-Place: Tackling Robotic Stacking of Diverse Shapes
  • 解决pip指令超时问题
  • 数据结构 堆(2)---堆的实现
  • LeetCode 热题100:42.接雨水
  • Unity UI的未来之路:从UGUI到UI Toolkit的架构演进与特性剖析(1)
  • 业务流逻辑如何搭建?为何橙武平台选用了 LogicFlow?​
  • day19 链表
  • 程序是如何生成的-以c语言为例
  • 信息学奥赛一本通 1553:【例 2】暗的连锁
  • 前端_CSS复习
  • 【React 入门系列】React 组件通讯与生命周期详解
  • 高可用架构模式——数据集群和数据分区
  • 单细胞转录组学+空间转录组的整合及思路
  • OneCode3.0 UI组件注解详解手册
  • 【vscode】vscode中python虚拟环境的创建
  • 回调地狱及解决方法
  • error C++17 or later compatible compiler is required to use ATen.
  • 【coze扣子】第1篇:coze快速入门
  • 威胁情报:Solana 开源机器人盗币分析
  • 以Java程序员角度理解MCP
  • 学习游戏制作记录(战斗系统简述以及击中效果)7.22
  • [c++11]std::function/bind
  • 基于SpringBoot+Vue的班级管理系统(Echarts图形化分析)
  • 101.对称二叉树
  • ubuntu 20.04 安装 cmake 3.26
  • VS Code 美化插件
  • 3ds Max 云端渲染插件 - 完整 Python 解决方案
  • Mysql-场景篇-2-线上高频访问的Mysql表,如何在线修改表结构影响最小?-1--Mysql8.0版本后的INSTANT DDL方案(推荐)
  • 基于mysql云数据库创建和美化表格,对比分析Power BI和Quick BI的功能优劣