当前位置: 首页 > news >正文

RabbitMQ使用topic Exchange实现微服务分组订阅

        案例场景:用户下单后需要多个微服务(如营销、会员)分别订阅并处理订单事件,且每个微服务可能有多个集群实例,需要保证同一个微服务的集群中,只有一个实例消费到消息。

        不同于Kafka和rocketMQ有分组消费的功能,rabbitMQ需要通过topic Exchange实现。

        1、消息设计

{"event_type": "order_created",  // 事件类型(如下单、支付成功)"order_id": "123456","user_id": "user_789","items": [{"product_id": "p1001", "quantity": 2},{"product_id": "p1002", "quantity": 1}],"total_amount": 399.00,"created_at": "2025-07-01T12:00:00Z"
}

        2、rabbitMQ设计

        使用 topic 类型的 Exchange:支持灵活的路由规则,不同微服务可通过 binding key 订阅特定事件。
        Exchange 名称示例:order_events。

微服务	    队列名	            绑定键(binding key)
营销系统	    marketing_queue	    order.created.marketing
会员系统	    member_queue	    order.created.member绑定键规则:<event_type>.<microservice_name>,便于后续扩展(如 order.cancel.marketing)。

        3、消息发布

        生产者(订单服务)发布消息时,指定路由键(routing key) 为 order.created

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.fasterxml.jackson.databind.ObjectMapper;public class OrderProducer {private static final String EXCHANGE_NAME = "order_events";private static final String ROUTING_KEY = "order.created";public static void main(String[] args) throws Exception {// 1. 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 2. topic Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic", true);// 3. 构造消息OrderEvent event = new OrderEvent();event.setEvent_type("order_created");event.setOrder_id("123456");event.setUser_id("user_789");event.setItems(List.of(Map.of("product_id", "p1001", "quantity", 2)));event.setTotal_amount(399.00);event.setCreated_at("2025-07-02T22:00:00Z");ObjectMapper mapper = new ObjectMapper();String messageJson = mapper.writeValueAsString(event);// 4. 发布消息channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true, // 持久化消息new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化.build(),messageJson.getBytes());System.out.println("Sent order event: " + messageJson);}}
}

        4、消息订阅(营销服务)

        不同服务只需要修改 QUEUE_NAME 和 BINDING_KEY 就可以了。 

import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;public class MarketingConsumer {private static final String QUEUE_NAME = "marketing_queue";private static final String BINDING_KEY = "order.created.marketing";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 1. 声明队列并绑定到 Exchangechannel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, "order_events", BINDING_KEY);// 2. 消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");ObjectMapper mapper = new ObjectMapper();OrderEvent event = mapper.readValue(messageJson, OrderEvent.class);// 3. 业务逻辑:发送营销短信System.out.println("Marketing: Sending SMS for order " + event.getOrder_id());// 4. 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 5. 设置手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

http://www.lryc.cn/news/579043.html

相关文章:

  • docker离线/在线环境下安装elasticsearch
  • IO--进程实操
  • 【新手小白的嵌入式学习之路】-STM32的学习_GPIO 8种模式学习心得
  • ai之RAG本地知识库--基于OCR和文本解析器的新一代RAG引擎:RAGFlow 认识和源码剖析
  • LeetCode--39.组合总和
  • Lua 安装使用教程
  • CRMEB Pro版v3.3源码全开源+PC端+Uniapp前端+搭建教程
  • 【C++】第十三节—stack、queue、priority_queue、容器适配器(介绍和使用+模拟实现+OJ题)
  • 客服机器人知识库怎么搭?智能客服机器人3种方案深度对比(含零售落地案例)
  • 去中心化身份:2025年Web3身份验证系统开发实践
  • 专题:2025AI营销市场发展研究报告|附400+份报告PDF汇总下载
  • 告别 ifconfig:openEuler 网络配置的现代化之路
  • 通俗理解JVM细节-面试篇
  • UI前端大数据处理策略优化:基于云计算的数据存储与计算
  • kotlin 通道trysend方法
  • ZYNQ学习记录FPGA(六)程序固化Vivado+Vitis
  • GO Web 框架 Gin 完全解析与实践
  • 【Unity】MiniGame编辑器小游戏(九)打砖块【Breakout】
  • 云上配送革命:亚矩云手机如何重塑Uber Eats的全球外卖生态
  • 服务器异常宕机或重启导致 RabbitMQ 启动失败问题分析与解决方案
  • 2025年Java常见面试题(持续更新)
  • Maven工具学习使用(十三)——Maven Wrapper命令解析与使用
  • 在设计提示词(Prompt)时,关于信息位置的安排z怎么 结合模型特性和任务目标
  • 量子算法:微算法科技用于定位未知哈希图的量子算法,网络安全中的哈希映射突破
  • Linux 后台启动java jar 程序 nohup java -jar
  • pytest之fixture中yield详解
  • 文心快码答用户问|Comate AI IDE专场
  • UniApp完美对接RuoYi框架开发企业级应用
  • Droplets:趣味AI课程,开启语言学习新旅程
  • 【趣谈】Android多用户导致的UserID、UID、shareUserId、UserHandle术语混乱讨论