当前位置: 首页 > news >正文

Spring-rabbit使用实战六

目录

优雅实现 Spring RabbitMQ 多交换机多队列绑定

一、核心设计思路

二、完整实现代码

1. 配置定义(枚举方式)

2. 声明式配置工厂

3. 生产者服务(通用发送方法)

4. 消费者抽象基类

5. 具体消费者实现

6. 配置中心扩展(YAML 配置)

三、设计优势与扩展点

1. 架构优势

2. 扩展点设计

四、最佳实践建议

1.命名规范

2.监控增强

3.单元测试方案

4.消费者弹性配置

五、架构演进方向

1.配置中心集成

2.消息路由服务

3.流量控制中间件


优雅实现 Spring RabbitMQ 多交换机多队列绑定

在需要创建多个交换机、每个交换机绑定多个队列的场景中,通过合理设计可以显著提高代码的复用性和扩展性。以下是针对 3 个交换机各绑定 3 个队列的优雅实现方案:

一、核心设计思路

  1. 配置驱动:使用枚举或配置类定义交换机、队列和路由键

  2. 工厂模式:创建交换机、队列和绑定的工厂方法

  3. 自动绑定:通过反射或配置自动创建绑定关系

  4. 模板方法:复用消费者处理逻辑

二、完整实现代码

1. 配置定义(枚举方式)

public enum RabbitConfigEnum {// 订单业务ORDER_EXCHANGE("order.direct.exchange", ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("order.create.queue", "order.create"),new QueueConfig("order.pay.queue", "order.pay"),new QueueConfig("order.cancel.queue", "order.cancel"))),// 库存业务INVENTORY_EXCHANGE("inventory.topic.exchange", ExchangeTypes.TOPIC,Arrays.asList(new QueueConfig("inventory.deduct.queue", "inventory.deduct"),new QueueConfig("inventory.restore.queue", "inventory.restore.*"),new QueueConfig("inventory.alert.queue", "inventory.alert.#"))),// 通知业务NOTIFICATION_EXCHANGE("notification.fanout.exchange", ExchangeTypes.FANOUT,Arrays.asList(new QueueConfig("email.notification.queue", ""), // Fanout 不需要路由键new QueueConfig("sms.notification.queue", ""),new QueueConfig("push.notification.queue", "")));private final String exchangeName;private final ExchangeType exchangeType;private final List<QueueConfig> queueConfigs;RabbitConfigEnum(String exchangeName, ExchangeType exchangeType, List<QueueConfig> queueConfigs) {this.exchangeName = exchangeName;this.exchangeType = exchangeType;this.queueConfigs = queueConfigs;}// 队列配置内部类@Getter@AllArgsConstructorpublic static class QueueConfig {private final String queueName;private final String routingKey;}// 交换机类型枚举public enum ExchangeType {DIRECT, TOPIC, FANOUT}// 获取所有配置public static List<RabbitConfigEnum> getAllConfigs() {return Arrays.asList(values());}
}

2. 声明式配置工厂

@Configuration
public class RabbitMQConfigFactory {private static final Logger logger = LoggerFactory.getLogger(RabbitMQConfigFactory.class);@Beanpublic Declarables createRabbitElements() {List<Declarable> declarables = new ArrayList<>();for (RabbitConfigEnum config : RabbitConfigEnum.getAllConfigs()) {// 创建交换机AbstractExchange exchange = createExchange(config.getExchangeName(), config.getExchangeType());declarables.add(exchange);logger.info("Created exchange: {}", config.getExchangeName());// 为交换机创建队列和绑定for (RabbitConfigEnum.QueueConfig queueConfig : config.getQueueConfigs()) {Queue queue = new Queue(queueConfig.getQueueName(), true);declarables.add(queue);Binding binding = createBinding(exchange, queue, queueConfig.getRoutingKey(), config.getExchangeType());declarables.add(binding);logger.info("Created queue '{}' bound to '{}' with routing key '{}'",queueConfig.getQueueName(),config.getExchangeName(),queueConfig.getRoutingKey());}}return new Declarables(declarables);}private AbstractExchange createExchange(String name, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT -> new DirectExchange(name, true, false);case TOPIC -> new TopicExchange(name, true, false);case FANOUT -> new FanoutExchange(name, true, false);};}private Binding createBinding(AbstractExchange exchange, Queue queue, String routingKey, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT, TOPIC -> BindingBuilder.bind(queue).to(exchange).with(routingKey);case FANOUT -> BindingBuilder.bind(queue).to((FanoutExchange) exchange);};}// 统一消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

3. 生产者服务(通用发送方法)

@Service
public class RabbitMQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 通用消息发送方法* * @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message, m -> {MessageProperties props = m.getMessageProperties();props.setContentType(MessageProperties.CONTENT_TYPE_JSON);props.setTimestamp(new Date());return m;});logger.debug("Sent message to {}[{}]: {}", exchange, routingKey, message);}/*** 按业务发送消息(推荐)* * @param config 业务配置* @param routingKey 路由键* @param message 消息内容*/public void sendByConfig(RabbitConfigEnum config, String routingKey, Object message) {sendMessage(config.getExchangeName(), routingKey, message);}
}

4. 消费者抽象基类

public abstract class AbstractRabbitConsumer<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitConsumer.class);/*** 通用消息处理模板* * @param message 消息内容* @param channel RabbitMQ通道* @param tag 消息标签*/@RabbitHandlerpublic void handleMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {// 1. 业务处理processMessage(message);// 2. 手动ACK确认channel.basicAck(tag, false);logger.debug("Message processed: {}", message);} catch (BusinessException e) {// 业务异常处理handleBusinessException(e, message, channel, tag);} catch (Exception e) {// 系统异常处理handleSystemException(e, message, channel, tag);}}/*** 业务处理抽象方法(子类实现)*/protected abstract void processMessage(T message) throws BusinessException;/*** 业务异常处理(可重写)*/protected void handleBusinessException(BusinessException e, T message, Channel channel, long tag) throws IOException {logger.error("Business error processing message: {}", message, e);// 拒绝消息但不重试channel.basicReject(tag, false);}/*** 系统异常处理(可重写)*/protected void handleSystemException(Exception e, T message, Channel channel, long tag) throws IOException {logger.error("System error processing message: {}", message, e);// 拒绝消息并重新入队channel.basicReject(tag, true);}
}

5. 具体消费者实现

// 订单创建消费者
@Component
@RabbitListener(queues = "order.create.queue")
public class OrderCreateConsumer extends AbstractRabbitConsumer<Order> {@Autowiredprivate InventoryService inventoryService;@Overrideprotected void processMessage(Order order) throws BusinessException {// 减库存inventoryService.deductStock(order.getProductId(), order.getQuantity());// 记录订单orderService.saveOrder(order);// 发送创建事件eventPublisher.publishOrderCreated(order);}// 重写异常处理@Overrideprotected void handleBusinessException(BusinessException e, Order order, Channel channel, long tag) throws IOException {if (e instanceof InventoryShortageException) {// 库存不足特殊处理orderService.markAsPending(order);channel.basicAck(tag, false);} else {super.handleBusinessException(e, order, channel, tag);}}
}// 库存告警消费者
@Component
@RabbitListener(queues = "inventory.alert.queue")
public class InventoryAlertConsumer extends AbstractRabbitConsumer<InventoryAlert> {@Overrideprotected void processMessage(InventoryAlert alert) {// 发送告警通知notificationService.sendAlert(alert.getProductId(), alert.getCurrentLevel());// 记录告警日志alertService.logAlert(alert);}
}

6. 配置中心扩展(YAML 配置)

# application.yml
spring:rabbitmq:host: rabbitmq-prod.example.comport: 5672username: ${RABBIT_USER}password: ${RABBIT_PASS}virtual-host: /prodlistener:simple:acknowledge-mode: manualconcurrency: 3max-concurrency: 10prefetch: 20# 自定义交换机配置(可选扩展)
rabbit:exchanges:- name: order.direct.exchangetype: DIRECTqueues:- name: order.create.queuerouting-key: order.create- name: order.pay.queuerouting-key: order.pay- name: order.cancel.queuerouting-key: order.cancel- name: inventory.topic.exchangetype: TOPICqueues:- name: inventory.deduct.queuerouting-key: inventory.deduct- name: inventory.restore.queuerouting-key: inventory.restore.*- name: inventory.alert.queuerouting-key: inventory.alert.#

三、设计优势与扩展点

1. 架构优势

设计特点优势应用场景
配置枚举化集中管理所有配置,避免硬编码多环境部署
工厂模式统一创建逻辑,减少重复代码新增交换机/队列
抽象消费者统一异常处理和ACK机制所有消费者
通用生产者简化消息发送接口所有业务场景

2. 扩展点设计

扩展点 1:动态添加新交换机

// 添加新业务配置
RabbitConfigEnum.NEW_EXCHANGE = new RabbitConfigEnum("new.exchange",ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("new.queue1", "key1"),new QueueConfig("new.queue2", "key2"))
);

扩展点 2:自定义绑定逻辑

// 重写绑定工厂方法
private Binding createCustomBinding(AbstractExchange exchange, Queue queue, String routingKey, ExchangeType type) {if ("special.binding".equals(routingKey)) {return BindingBuilder.bind(queue).to(exchange).with(routingKey).and(createCustomArguments()); // 自定义参数}return createBinding(exchange, queue, routingKey, type);
}

扩展点 3:基于配置文件的动态配置

@Configuration
@ConfigurationProperties(prefix = "rabbit")
public class DynamicRabbitConfig {private List<ExchangeConfig> exchanges;@Beanpublic Declarables dynamicDeclarables() {// 类似工厂方法实现,从配置文件读取}@Getter @Setterpublic static class ExchangeConfig {private String name;private String type;private List<QueueBinding> queues;}@Getter @Setterpublic static class QueueBinding {private String name;private String routingKey;}
}

四、最佳实践建议

1.命名规范

// 业务.类型.功能
String exchangeName = "order.direct.exchange";
String queueName = "inventory.topic.alert.queue";
String routingKey = "order.payment.completed";

2.监控增强

// 在生产者中添加监控埋点
public void sendMessage(String exchange, String routingKey, Object message) {Timer.Sample sample = Timer.start(metricsRegistry);// ...发送逻辑sample.stop(metricsRegistry.timer("rabbit.produce.time", "exchange", exchange, "routingKey", routingKey));
}

3.单元测试方案

@SpringBootTest
public class RabbitConfigTest {@Autowiredprivate RabbitAdmin rabbitAdmin;@Testpublic void testExchangeAndQueueCreation() {// 验证所有交换机已创建for (RabbitConfigEnum config : RabbitConfigEnum.values()) {Exchange exchange = new DirectExchange(config.getExchangeName());assertTrue(rabbitAdmin.getExchangeInfo(exchange.getName()) != null);// 验证队列绑定for (QueueConfig qc : config.getQueueConfigs()) {Queue queue = new Queue(qc.getQueueName());assertTrue(rabbitAdmin.getQueueInfo(queue.getName()) != null);}}}
}

4.消费者弹性配置

# 针对不同队列配置不同消费者参数
spring:rabbitmq:listener:order:concurrency: 5max-concurrency: 20notification:concurrency: 2max-concurrency: 5

五、架构演进方向

1.配置中心集成

2.消息路由服务

@Service
public class MessageRouter {private Map<MessageType, RabbitConfigEnum> routingMap;public void routeMessage(MessageType type, Object message) {RabbitConfigEnum config = routingMap.get(type);producer.sendByConfig(config, config.getDefaultKey(), message);}
}

3.流量控制中间件

@Around("@annotation(rabbitListener)")
public Object rateLimit(ProceedingJoinPoint joinPoint) {if (!rateLimiter.tryAcquire()) {// 返回特殊响应,触发消费者暂停return new RateLimitExceededResponse();}return joinPoint.proceed();
}

这种设计通过配置驱动、工厂模式和模板方法,实现了高可复用的 RabbitMQ 集成方案,能够轻松应对业务扩展需求,同时保持代码的简洁性和可维护性。

http://www.lryc.cn/news/610556.html

相关文章:

  • 智慧会所:科技赋能,开启休闲新体验
  • 计算机算术5-整形除法
  • 代码训练营DAY53 第十一章:图论part04
  • bpf系统调用及示例
  • K8S 性能瓶颈排查
  • CVE-2017-8291源码分析与漏洞复现(PIL远程命令执行漏洞)
  • 软件测试中,pytest 框架如何运行上传失败的测试用例?
  • docker国内镜像源列表
  • 软件测试中,pytest 如何运行多个文件或整个目录?
  • Python入门Day15:面向对象进阶(类变量,继承,封装,多态)
  • springboot + maven 使用资源占位符实现动态加载配置文件
  • Modstart 请求出现 Access to XMLHttpRequest at ‘xx‘
  • imx6ull-驱动开发篇9——设备树下的 LED 驱动实验
  • ubuntu的压缩工具zip的安装和使用
  • 【C++】类和对象1
  • 力扣106:从中序与后序遍历序列构造二叉树
  • 「PromptPilot 大模型智能提示词平台」—— PromptPilot × 豆包大模型 1.6:客户投诉邮件高效回复智能提示词解决方案
  • 工业级 CAN 与以太网桥梁:串口服务器CAN通讯转换器深度解析(上)
  • 【科研绘图系列】R语言绘制误差棒图
  • 姜 第四章 线性方程组
  • shmget等共享内存系统调用及示例
  • uniapp 类似popover气泡下拉框组件
  • Maven和Gradle在构建项目上的区别
  • uniapp Android App集成支付宝的扫码组件mPaaS
  • Linux驱动25 --- RkMedia音频API使用增加 USB 音视频设备
  • Linux驱动24 --- RkMedia 视频 API 使用
  • 技术文章推荐|解析 ESA 零售交易方案(技术分析+案例拆解)
  • 基于k8s环境下的pulsar常用命令(下)
  • JavaWeb02——基础标签及样式(黑马视频笔记)
  • 203.移除链表元素 707.设计链表 206.反转链表