当前位置: 首页 > article >正文

RabbitMQ集群与负载均衡实战指南

文章目录

  • 集群架构
    • 概述
    • 仲裁队列的使用
      • 1. 使用Spring框架代码创建
      • 2. 使用amqp-client创建
      • 3. 使用管理平台创建
  • 负载均衡
    • 引入HAProxy 负载均衡:
    • 使用方法
      • 1. 修改配置文件
      • 2. 声明队列 test_cluster
      • 3. 发送消息

集群架构

概述

RabbitMQ支持部署多个结点,每个结点存储相同的数据,本质上没有区别。用户可以访问任意一个结点,其响应结果是一致的。每个结点都包含多个队列,队列的类型有很多,本博客主要探讨经典队列(Classic)和仲裁队列(Quorum)。不论什么类型的队列,都会存储两类消息:

  • 元数据:队列名称、交换机信息、绑定等
  • 消息数据:队列中存储的实际消息。

1. 经典队列(Classic Queues)

  • 特点
    • 元数据在集群所有节点共享,消息默认存储在主节点因此如果某个结点一旦宕机,对应存储的消息数据在该集群中将会丢失
    • 支持持久化(Durable)和非持久化。
    • 一致性较弱(最终一致性),性能高,灵活性强。
    • 适合通用场景,如任务分发、日志收集。

2. 仲裁队列(Quorum Queues)

  • 特点
    • 基于Raft 共识算法的高可用队列,消息数据会同步到集群中的其他节点,即使某个结点宕机,也能够保证集群的数据一致性。

    • 性能开销较高,不支持部分经典队列功能(如优先级)。

    • 适合对一致性要求高的场景,如金融交易、订单处理。

Raft 共识算法,这里用动画的方式形象的阐释了Raft保证数据一致性的执行流程,所以小编在这里偷个懒。

仲裁队列的使用

仲裁队列(Quorum Queue)是RabbitMQ中的一种高可用队列,它能够在节点故障时继续提供服务。以下是创建仲裁队列的三种方式:

1. 使用Spring框架代码创建

通过Spring的注解和配置,可以方便地创建仲裁队列。


@Configuration
public class QuorumConfig {@Bean("quorumQueue")public Queue quorumQueue() {return QueueBuilder.durable("quorum_queue").quorum().build();}
}

2. 使用amqp-client创建

通过Java代码直接使用amqp-client库来创建仲裁队列。


public class QuorumProducer {public static void main(String[] args) throws IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {Map<String, Object> param = new HashMap<>();param.put("x-queue-type", "quorum");channel.queueDeclare("quorum_queue", true, false, false, param);}}
}

3. 使用管理平台创建

通过RabbitMQ的管理平台,可以图形化地创建仲裁队列。


负载均衡

虽然RabbitMQ支持集群部署,看似好像提升了流量的承载力。但是如果请求只发送给一个或者那几个负载过高的结点,羊毛一直往一处薅,这个结点一旦挂掉,那么用户无法访问了!

解决办法——

引入HAProxy 负载均衡:

在这里插入图片描述

它和我们之前在Spirng中学的LoadBalence类似会把请求路由到正常的结点,并且个可以设定路由策略,充分利用每一个结点资源。

使用方法

在现代微服务架构中,负载均衡是确保服务高可用性和性能的关键技术之一。本文将介绍如何使用RabbitMQ实现负载均衡,并通过示例代码展示其具体实现步骤。

1. 修改配置文件

首先,需要修改RabbitMQ的配置文件,将HAProxy的IP和端口设置为RabbitMQ的绑定地址。

spring:rabbitmq:addresses: amqp://study:study@124.71.229.73:15670/test

2. 声明队列 test_cluster

在Spring Boot应用中,我们需要声明一个队列,用于负载均衡。


@Configuration
public class ClusterConfig {@Bean("ClusterQueue")public Queue clusterQueue() {return QueueBuilder.durable(Constant.CLUSTER_QUEUE).quorum().build();}
}

3. 发送消息

接下来,我们可以通过控制器发送消息到声明的队列。


@RestController
@RequestMapping("/cluster")
public class ClusterController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMappingpublic String cluster() {rabbitTemplate.convertAndSend("", Constant.CLUSTER_QUEUE, "quorum test...");return "发送成功!";}
}

或者使用amqp客户端发送消息:


public class ClusterProducer {private static final String QUEUE_NAME = "hello_world";public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 2. 设置参数factory.setHost("124.71.229.73"); // HAProxy 地址factory.setPort(5670); // HAProxy 端口factory.setUsername("host"); // 用户名,默认factory.setPassword("study"); // 密码,默认// 3. 创建连接connectionConnection connection = factory.newConnection();// 4. 创建channel通道Channel channel = connection.createChannel();// 5. 声明队列Map<String, Object> param = new HashMap<>();param.put("x-queue-type", "quorum");channel.queueDeclare("test_cluster", true, false, false, param);// 6. 通过channel发送消息到队列中String msg = "hello cluster...";// 简单模式下,使用的是默认交换机,使用默认交换机时,routingKey要和队列名称一致,才可以路由到对应的队列中去channel.basicPublish("", "test_cluster", null, msg.getBytes());// 7. 释放资源System.out.println("消息发送成功!");connection.close();}
}
http://www.lryc.cn/news/2391852.html

相关文章:

  • 怎么开机自动启动vscode项目
  • Unity 中 Update、FixedUpdate 和 LateUpdate 的区别及使用场景
  • linux安装ffmpeg7.0.2全过程
  • Java中的设计模式实战:单例、工厂、策略模式的最佳实践
  • DexGarmentLab 论文翻译
  • Elasticsearch性能优化全解析
  • 2025.05.28【Parallel】Parallel绘图:拟时序分析专用图
  • tc3975开发板上有ft2232这块的电路,我想知道这个开发板有哪些升级方式,重点关注是怎样通过ft2232实现的烧录升级的
  • 自动驾驶与智能交通:构建未来出行的智能引擎
  • Kotlin Multiplatform与Flutter深度对比:跨平台开发方案的实战选择
  • ELectron 中 BrowserView 如何进行实时定位和尺寸调整
  • 深兰科技董事长陈海波率队考察南京,加速AI大模型区域落地应用
  • 《深度关系-从建立关系到彼此信任》
  • IT选型指南:电信行业需要怎样的服务器?
  • 【ConvLSTM第二期】模拟视频帧的时序建模(Python代码实现)
  • [VMM]分享一个用SystemC编写的页表管理程序
  • 将docker数据目录迁移到 home目录下
  • 【论文解读】DETR: 用Transformer实现真正的End2End目标检测
  • Pytest 是什么
  • ElasticSearch简介及常用操作指南
  • 缓存常见问题:缓存穿透、缓存雪崩以及缓存击穿
  • 纤维组织效应偏斜如何影响您的高速设计
  • 【深度学习】sglang 的部署参数详解
  • SDL2常用函数:SDL_RendererSDL_CreateRendererSDL_RenderCopySDL_RenderPresent
  • [git]忽略.gitignore文件
  • FEMFAT许可的有效期限
  • Rust使用Cargo构建项目
  • Python训练营打卡Day39
  • UE5蓝图中播放背景音乐和使用代码播放声音
  • AI 赋能数据可视化:漏斗图制作的创新攻略