当前位置: 首页 > news >正文

RabbitMQ 4.1.1-Local random exchange体验

Local Random Exchange

一种 RabbitMQ 4.0+ 引入的新型交换机,主要是为 request-reply(RPC)场景 设计的。

  • 使用这种交换机时,消息只会被路由到本地节点上的队列,可以确保极低的消息发布延迟。
  • 如果有多个本地队列绑定到该交换机,它会随机选择一个队列接收消息。

关键点总结:

  • 本地传输:不会把消息发到其他节点的队列。
  • 随机选队列:多个本地队列中随机挑一个。
  • 发布快:避免了跨节点网络通信,延迟低。
  • 最适合用于 RPC 模式,即“请求-响应”

建议将 Local Random Exchange 和 exclusive(独占)队列搭配使用,这样可以为 RPC 场景提供更低延迟的组合。
注意

  • Exclusive 队列是 RabbitMQ 中只对某个连接开放的临时队列(通常用于响应)。
  • LRE + Exclusive Queue,可以避免消息在集群中转发,提高响应速度。

LRE 不转发消息到其他节点,所以如果当前节点没有合适的队列,消息会被直接丢弃!
所以使用时你必须确保每个节点上都至少有一个消费者绑定的队列

在这里插入图片描述

在 RabbitMQ 前面加负载均衡器(load balancer)会让这种交换机类型几乎无法正常工作。

原因分析

  • Local Random Exchange 依赖于消息被投递到“本地绑定队列(local queues)”
  • 如果用了负载均衡,客户端连接可能随机落在任何节点上,消息将发给该节点的本地队列
  • 如果该节点上没有消费者绑定本地队列,消息就会被丢弃

实操如下
application.properties

# JVM内存配置
# 设置较小的堆内存,避免占用过多系统资源
spring.jvm.memory=-Xmx256m -Xms128m -XX:MaxMetaspaceSize=128m# 设置较小的线程栈大小
spring.jvm.thread-stack-size=-Xss256k# 启用GC日志,帮助诊断内存问题
spring.jvm.gc-log=-Xlog:gc*:file=./logs/gc.log:time,uptime,level,tags:filecount=5,filesize=10m# 设置较小的代码缓存大小
spring.jvm.code-cache=-XX:ReservedCodeCacheSize=128m# 启用内存压缩指针基址设置,将Java堆放在4GB以上地址空间
spring.jvm.heap-base=-XX:HeapBaseMinAddress=4g# 启用G1垃圾收集器的更积极设置
spring.jvm.gc-tuning=-XX:G1ReservePercent=10 -XX:G1HeapRegionSize=4m -XX:InitiatingHeapOccupancyPercent=30# 禁用显式GC调用
spring.jvm.disable-explicit-gc=-XX:+DisableExplicitGC

application.yml

#定义要使用的交换机和队列名称
spring:application:name: local-random-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host# 连接超时时间connection-timeout: 10000# 日志配置
logging:level:org.springframework.amqp: INFO     # AMQP日志级别com.example: DEBUG                 # 应用日志级别
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ 配置类,用于创建 Local Random Exchange(本地随机交换机)和绑定的 RPC 队列。* 本配置主要用于实现基于 RabbitMQ 的 RPC 模式,使用 Local Random Exchange 类型降低延迟。*/
@Configuration
public class RabbitConfig {// Local Random Exchange 名称(自定义交换机)public static final String LRE_EXCHANGE = "lre.exchange";// RPC 使用的队列名称public static final String RPC_QUEUE_NAME = "rpc.queue";/*** 声明一个 Local Random Exchange(x-local-random 类型的交换机)。** - durable: true 表示交换机会持久化* - autoDelete: false 表示不会在没有绑定队列时自动删除* - arguments: 可选参数,此处为空*/@Beanpublic CustomExchange lreExchange() {Map<String, Object> args = new HashMap<>();return new CustomExchange(LRE_EXCHANGE, "x-local-random", true, false, args);}/*** 声明一个 RPC 队列。** - durable: false 表示不持久化(重启后丢失)* - exclusive: false 表示不是只被当前连接独占* - autoDelete: true 表示连接断开后自动删除队列** 如果你要模拟 RPC Client 的 exclusive 回调队列,建议用 `exclusive = true`。*/@Beanpublic Queue rpcQueue() {return new Queue(RPC_QUEUE_NAME, false, false, true);}/*** 将 RPC 队列绑定到 Local Random Exchange 上。** - routingKey 设置为 "",因为 Local Random Exchange 不关心路由键*/@Beanpublic Binding binding(Queue rpcQueue, CustomExchange lreExchange) {return BindingBuilder.bind(rpcQueue).to(lreExchange).with("").noargs();}
}

方式一、手动监听

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** 模拟 RPC 客户端,用于通过 RabbitMQ 的 Local Random Exchange 发送请求并接收异步响应。*/
@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 向服务器发送请求,并设置回调队列接收响应。** @param message 请求消息内容* @return 返回一个确认字符串(实际响应在回调中处理)*/public String sendRequest(String message) throws Exception {// 生成唯一标识 correlationId,用于标识请求-响应配对String correlationId = UUID.randomUUID().toString();// 临时生成一个独占的匿名回调队列(例如 amq.gen-xxxxxx)String replyQueue = rabbitTemplate.execute(channel -> channel.queueDeclare().getQueue());// 设置 RabbitTemplate 的回调地址(其实不会生效于 send 模式,仅用于演示)rabbitTemplate.setReplyAddress(replyQueue);rabbitTemplate.setReplyTimeout(5000); // 设置超时时间(ms)rabbitTemplate.setCorrelationKey("correlation_id"); // 设置用于匹配的属性名(可选)// 设置监听器容器,监听回调队列中的响应消息SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());container.setQueueNames(replyQueue); // 指定监听的队列container.setMessageListener(new MessageListenerAdapter(new Object() {// 定义接收到消息后的处理方法(方法名必须与监听器默认匹配或显式指定)@SuppressWarnings("unused")public void handleMessage(byte[] reply) {String response = new String(reply, StandardCharsets.UTF_8);System.out.println("Got reply: " + response);// 实际中这里应唤醒等待线程或放入响应Map中(基于 correlationId)}}));container.start(); // 启动监听器容器// 构造请求消息,设置 reply_to 和 correlation_id 属性MessageProperties props = new MessageProperties();props.setReplyTo(replyQueue);             // 告诉服务端响应要发到这个队列props.setCorrelationId(correlationId);    // 服务端会原样返回,用于客户端识别对应响应Message request = new Message(message.getBytes(), props);// 通过 RabbitTemplate 发送消息到本地随机交换机(Local Random Exchange)rabbitTemplate.send(RabbitConfig.LRE_EXCHANGE, "", request);return "Request sent with correlationId: " + correlationId;}
}

方式二、推荐写法
也可以用使用 Spring AMQP 的官方推荐 RPC 模式(即 convertSendAndReceive())的实现方式。这种方式完全利用了 RabbitTemplate 的自动 reply-to、correlationId、超时机制 —— 更加简单可靠

package com.example.product;import com.example.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public String sendRequest(String message) {// 设置超时时间(可选)rabbitTemplate.setReplyTimeout(5000);// 使用 convertSendAndReceive 会自动:// 1. 创建一个临时 reply queue(exclusive)// 2. 设置 reply_to 和 correlation_id// 3. 等待结果并返回Object response = rabbitTemplate.convertSendAndReceive(RabbitConfig.LRE_EXCHANGE, "", message);if (response != null) {return "Received response: " + response.toString();} else {return "No response received (timeout or error)";}}
}

两者优点总结

功能原来方式(手动监听)convertSendAndReceive()(推荐)
reply_to自动处理❌ 手动✅ 自动
correlation_id 匹配❌ 手动✅ 自动
超时控制❌ 复杂✅ 简单
代码复杂度
推荐程度✅✅✅

RPC服务端处理
方式一 手动

package com.example.consumer;import com.example.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class RpcServer {/*** RabbitMQ RPC 服务端处理方法* * 使用 @RabbitListener 监听指定队列,当接收到客户端请求时,手动获取 reply_to 和 correlation_id,* 并通过底层 channel 手动发送响应消息。** @param message        收到的消息正文* @param correlationId  唯一标识此次 RPC 请求的 ID(由客户端生成并设置)* @param replyTo        回调队列(客户端临时队列)* @param requestMessage 原始 AMQP 消息对象* @param channel        底层通信通道,用于手动发送响应* @return null(返回值不会被用来发送响应,因为我们是手动发送的)*/@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)public String handleRpc(String message,@Header(AmqpHeaders.CORRELATION_ID) String correlationId,@Header(AmqpHeaders.REPLY_TO) String replyTo,Message requestMessage,Channel channel) throws IOException {// 构造服务端响应内容String response = "Processed: " + message;// 打印收到的信息和即将回应的队列System.out.println("replyTo: " + replyTo + ", Server received: " + message + ", correlationId: " + correlationId);// 构造响应消息的属性,确保带上原始 correlationIdMessageProperties replyProps = new MessageProperties();replyProps.setCorrelationId(correlationId);// 构造响应消息对象Message reply = new Message(response.getBytes(), replyProps);// 手动发送响应消息到客户端指定的临时队列channel.basicPublish("", replyTo, null, reply.getBody());// 因为手动处理了响应,不需要 Spring 自动回传return null;}
}

方式二自动处理

@RabbitListener(queues = RabbitConfig.RPC_QUEUE_NAME)
public String handleRpc(String message) {System.out.println("Server received: " + message);return "Processed: " + message;
}

运行结果

Request sent with correlationId: 9cf6df25-3e02-47da-96ad-23a21791b391
replay:amq.gen-CcSRdsuLJtjtXOzFUE3Eug Server received: 0测试0 correlationId:9cf6df25-3e02-47da-96ad-23a21791b391
Got reply: Processed: 0测试0
Request sent with correlationId: d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
replay:amq.gen-jnFzJQallOE6QRkZEZyn-Q Server received: 3测试1 correlationId:d1477ff0-84dd-4bf6-ba8d-d8b613fbcadc
Got reply: Processed: 3测试1
Request sent with correlationId: 2009671b-ef8d-418c-ae9b-c58c8e0dac83
replay:amq.gen--tLpLz3xs9p_BEZmqJUjFg Server received: 6测试2 correlationId:2009671b-ef8d-418c-ae9b-c58c8e0dac83
Got reply: Processed: 6测试2
Request sent with correlationId: 6637a3dd-4e24-48e5-871f-cd671ea6d9b6
replay:amq.gen-CejNGqwNk6bWPkxrQLvH7Q Server received: 9测试3 correlationId:6637a3dd-4e24-48e5-871f-cd671ea6d9b6
Got reply: Processed: 9测试3
Request sent with correlationId: c994fab1-75c4-4618-8af8-b03f2fcdfa6f
replay:amq.gen-mdKE_hhHhj_ZEgT-fIm4nw Server received: 12测试4 correlationId:c994fab1-75c4-4618-8af8-b03f2fcdfa6f
Got reply: Processed: 12测试4
Request sent with correlationId: b27d1409-d595-47f8-b920-2d4ad23288d2
replay:amq.gen-ofZgztMXNh9MMEejK6DDGA Server received: 15测试5 correlationId:b27d1409-d595-47f8-b920-2d4ad23288d2
Got reply: Processed: 15测试5
Request sent with correlationId: adc98f0d-5270-4033-86c0-e863cd56ecee
replay:amq.gen-xKkf-7LcEhOzamv892nL8A Server received: 18测试6 correlationId:adc98f0d-5270-4033-86c0-e863cd56ecee
Got reply: Processed: 18测试6
Request sent with correlationId: 87f6722d-e974-474d-a79c-9aea69401fa7
replay:amq.gen-r5jjy4ypnSDso-HZ5PuNWA Server received: 21测试7 correlationId:87f6722d-e974-474d-a79c-9aea69401fa7
Got reply: Processed: 21测试7
Request sent with correlationId: de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
replay:amq.gen-7QDoBB5wqbjLC0MidVSkbA Server received: 24测试8 correlationId:de2a03f0-9d78-4dd8-b29d-3e904b4bb1dd
Got reply: Processed: 24测试8
Request sent with correlationId: 1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
replay:amq.gen-1rFRnN9vKCUt6HIrRLSoBw Server received: 27测试9 correlationId:1ce9cc12-0b24-4a19-9828-2a0dbc5ab4bc
Got reply: Processed: 27测试9

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.lryc.cn/news/583644.html

相关文章:

  • C++如何进行性能优化?
  • 19-C#静态方法与静态类
  • 【WEB】Polar靶场 21-25题 详细笔记
  • 从0开始学习R语言--Day42--LM检验
  • 异地组网
  • 数据分析框架和方法
  • Mac电脑,休眠以后,发现电量一直在减少,而且一个晚上,基本上是没了,开机都需要插电源的简单处理
  • 卫星通信终端天线的5种对星模式之二:功率检测型载波跟踪
  • 【PyTorch】PyTorch中数据准备工作(AI生成)
  • 深度学习——损失函数
  • Hexo + Butterfly + Vercel 完整个人Blog部署指南
  • Flask3.1打造极简CMS系统
  • 自动化Trae Apollo参数解释的批量获取
  • 股权结构解析
  • SpringBoot集成文件 - 大文件的上传(异步,分片,断点续传和秒传)
  • 专题一_双指针_查找总价格为目标值的两个商品
  • 拼多多正在错失即时零售?
  • ECR仓库CloudFormation模板完整指南
  • 【每日算法】专题六_模拟
  • WPF学习笔记(27)科学计算器
  • 1、专栏介绍以及目录
  • 周立功汽车软件ZXDoc深度解析:新能源汽车开发新基建的破局之道
  • eggNOG数据库注释文件
  • 以太网基础④IP 协议介绍与 IP 校验和算法实现
  • 【Linux网络编程】Socket - TCP
  • Java-----韩顺平单例设计模式学习笔记
  • swiglu 激活函数学习笔记
  • Java垃圾收集机制Test1
  • [Python] 区分方法 函数
  • 深度解析:将SymPy符号表达式转化为高效NumPy计算函数的通用解决方案