当前位置: 首页 > news >正文

《RockectMQ实战与原理解析》Chapter4-分布式消息队列的协调者

4.1 NameServer 的功能

        NameServer 是整个消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。 同时,各个角色的机器都要定期向 NameServer 上报自己的状态超时不上报的话, NameServer 会认为某个机器出故障不可用了,其他的组件会把这个机器从可用列表里移除

        NamServer 可以部署多个,相互之间独立,其他角色同时向多个 NameServer机器上报状态信息,从而达到热备份的目的。 NameServer 本身是无状态的,也就是说 NameServer 中的 Broker 、 Topic 等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中的(NameServer 支持配置参数的持久化, 一般用不到) 。

         4.1.1 集群状态的存储结构 

        org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager有五个变量RouteInfoManager.javahttps://gitee.com/apache/rocketmq/blob/develop/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java

    /***      这个结构的 Key 是 Topic 的名称,它存储了所有 Topic的属性信息。*  Value 是个 QueueData 队列 , 队里的长度等于这个 Topic数据存储的*  Master Broker 的个数, QueueData 里存储着 Broker 的名称、读写*  queue 的数量 、 同步标识等*/private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;/***      以 BrokerName 为索引 ,相同名称的 Broker 可能存在多台机器, 一个*   Master 和多个 Slave 。 这个结构存储着一个 BrokerName 对应的属性信*   息,包括所属的 Cluster 名称, 一个 Master Broker 和多个 Slave Broker *   的地址信息 。*/private final Map<String/* brokerName */, BrokerData> brokerAddrTable;/***      存储的是集群中 Cluster 的信息,结果很简单,就是一个 Cluster 名称对*  应一个由 BrokerName 组成的集合。*/private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;/***       这个结构和 BrokerAddrTable 有关系,但是内容完全不同,这个结构的*  Key 是 BrokerAddr ,也就是对应着一台机器, BrokerAddrTable 中的 Key*  是 BrokerName , 多个机器的 BrokerName 可以相同 。 BrokerLiveTable*  存储的内容是这台 Broker 机器的实时状态,包括上次更新状态的时间戳, *  NameServer 会定期检查这个时间戳 ,超时没有更新就认为这个 Broker 无效了,*  将其从 Broker 列表里清除。*/private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;/***      FilterServer 是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一*  个 Broker 可以有 一个或多个 FilterServer 。 这个结构的 Key 是 Broker*  的地址, Value 是和这个 Broker 关联的多个 FilterServer 的地址。*/private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

        4.1.2 状态维护逻辑

        因为其他角色会主动向Name Server 上报状态,所以 NameServer 的主 要逻 辑在 DefaultRequest ­Processor 类中,根据上报消息里的请求码做相 应 的处理, 更新存储的对应信息 。 此外,连接断开的 事 件也 会 触发状态 更新,具体逻辑在org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService

        当 NameServer 和 Broker 的长连接断掉以后, onChannelDestroy 函数会被调用,把这个 Broker 的信息清理出去 。

        NameServer 还有定时检查时间戳的逻辑 , Broker 向 NameServer 发送的心跳会更新时间戳, 当 NameServer 检查 到时间戳长时间没有更新后,便会触发清理逻辑 。

org.apache.rocketmq.namesrv.NamesrvController 每5s检查一次

 4.2 各个角色间的交互流程

        4.2.1 交互流程源码分析

        创建 Topic 的代码 是 在  org.apache.rocketmq.tools.command.topic 里的UpdateTopicSubCommand 类中,创建 Topic 的命令 是 updateTopic

UpdateTopicSubCommand.javahttps://gitee.com/apache/rocketmq/blob/develop/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java

         org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute 调用org.apache.rocketmq.tools.admin.DefaultMQAdminExt#createAndUpdateTopicConfig

是向 NameServer 发 送注 册 信 息, NameServer 完 成创 建Topic 的逻辑后,其他客户端才能发现新增 的 Topic ,相关逻辑在org.apache.rocketmq.namesrv. routeinfo.RouteInfoManager#registerBroker,首先更新 Broker 信息,然后对每个 Master 角色的 Broker ,创建一个QueueData 对象。 如果是新建 Topic ,就是添加 QueueData 对象;如果是修改Topic ,就是把旧的 QueueData 删除 , 加入新的 QueueData 。

        4.2.2 为何不用 ZooKeeper

        RocketMQ 的架构设计决定了它不需要进行 Master 选举,用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了 。 5 张图告诉你 RocketMQ 为什么不使用 Zookeeper 做注册中心 - 腾讯云开发者社区-腾讯云 (tencent.com)https://cloud.tencent.com/developer/article/2118883

4.3 底层通信机制 

        4.3.1 Remoting 模块

        RocketMQ 的通信相关代码在 Remoting 模块里,先来看看主要类结构 

 

 NettyRemotingClient、NettyRemotingServer而 且 都继承了NettyRemoting-Abstract 类 。通过上面的封装 , RocketMQ 各个模块间的通信, 可以通过发送统一格式的自定义消息 ( RemotingCommand ) 来完成, 各个模块间的通信实现简洁明了 。

        比如 NameServer 模 块中, NameServerController 有 一 个 remotingServer 变量 , NameServer 在启动时初始化各个变量 , 然后启 动 remotingServer 即可,剩下 NameServer 要 做的 是专心实现处理 RemotingCommand 的逻辑。 

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

 

        4.3.2 协议设计和编解码

        RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有意义 的内 容之间互相转换。  

         1 )第一部分是大端 4 个字节整数,值等于第二、三、 四部分长度的总和;

        2 )第二部分是大端 4 个字节整数,值等于第三部分的长度;

        3 )第三部分是通过 Json 序列化的数据;

        4 )第四部分是通过应用自定义二进制序列化的数据。

        消息的解码过程在 RemotingCommand 的 decode 函数里org.apache.rocketmq.remoting.protocol.RemotingCommand#decode(io.netty.buffer.ByteBuf)

org.apache.rocketmq.remoting.protocol.RemotingCommand#encode

        4.3.3 Netty 库 

        RocketMQ 是基于 Netty 库来完成 RemotingServer 和 RemotingClient 具体的通信实现的, Netty 是个事件驱动的网络编程框架,它屏蔽了 Java Socket 、 NIO等复杂细节,用户只需用好 Netty ,就可以实现一个“ 网络编程专家+并发编程专家”水平的 Server 、 Client 网络程序 。 应用 Netty 有一定的门槛,需要了解它的 EventLoopGroup 、 Channel 、 Handler 模型以及各种具体的配置。 RocketMQ利用 Netty 实现的通信类是 NettyRemotingServer 和 NettyRemotingClient ,用户也可以参考这两个类的实现来学习使用 Netty 。

http://www.lryc.cn/news/59512.html

相关文章:

  • Spring Boot 最适配的 UI 是什么
  • TensorFlow 1.x 深度学习秘籍:6~10
  • 分布式场景下,Apache YARN、Google Kubernetes 如何解决资源管理问题?
  • RK3399平台开发系列讲解(基础篇)POSIX 定时器
  • web小游戏开发:扫雷(三)(完成度90%)
  • 创建菜单栏、菜单、菜单项
  • 专访丨AWS量子网络中心科学家Antía Lamas谈量子计算
  • [ 云计算 | Azure ] Chapter 04 | 核心体系结构之数据中心、区域与区域对、可用区和地理区域
  • 升级长江存储最新闪存,忆恒创源发布新一代企业级NVMe SSD
  • Xcode14:”Failed to prepare the device for development“解决
  • 程序员的“灵魂笔记本“:五款高效笔记软件推荐
  • Linux基础命令-scp远程复制文件
  • 【python学习】基础篇-列表元素排序操作 sort()、min()、max()函数
  • 机器视觉检测系统的基本流程你知道吗
  • 【vue】Vue 开发技巧:
  • Kubebuilder Hello World
  • SpringSecurity之权限方案——用户认证
  • 本地电脑轻松部署GPT4(无需账号)!
  • 每天一道大厂SQL题【Day21】华泰证券真题实战(三)
  • 腾讯云8核16G18M轻量服务器CPU带宽流量性能测评
  • 算法之归并排序
  • Mysql日志系统-mysql serve层
  • 阿里云蔡英华:云智一体,让产业全面迈向智能
  • 打怪升级之FPGA组成原理(LE部分)
  • c++的多态
  • 【数据结构与算法】堆的实现(附源码)
  • win10彻底永久关闭自动更新【亲测有效】
  • 【Unity UPR】造个获取深度法线纹理的轮子
  • 用 Python解析HTML页面
  • python logging 详解