当前位置: 首页 > news >正文

RocketMQ: 消息过滤,通信组件,服务发现

消息过滤


1 ) 简单消息过滤

/*** 订阅指定topic下tags分别等于 TagA 或 TagC 或 TagD
*/consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
  • 如以上代码所示,简单消息过滤通过指定多个 Tag 来过滤消息,过滤的动作在服务器进行

2 ) 高级消息过滤

    1. Broker 所在的机器会启劢多个 FilterServer 过滤迕程
    1. Consumer 启劢后,会吐 FilterServer 上传一个过滤的 Java 类
    1. Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer
  • 总结
    • 使用 CPU 资源来换取网卡流量资源
    • FilterServer 与 Broker 部署在同一台机器,数据通过本地回环通信,不走网卡
    • 一台 Broker 部署多个 FilterServer,充分利用 CPU 资源,因为单个 JVM 难以全面利用高配的物理机 CPU 资源
    • 因为过滤代码使用 Java 语言来编写,应用几乎可以做任意形式的服务器端消息过滤,例如通过 Message Header 进行过滤,甚至可以按照 Message Body 进行过滤
    • 使用 Java 语言进行作为过滤表达式是一个双刃剑,方便了应用的过滤操作,但是带来了服务器端的安全风险。需要应用来保证过滤代码安全,例如在过滤程序里尽可能不做申请大内存,创建线程等操作。避免 Broker 服务器发生资源泄漏

通信组件

  • RocketMQ 通信组件使用了 Netty-4.0.9.Final,在之上做了简单的协议封装

1 )网络协议

在这里插入图片描述

  1. 大端 4 个字节整数,等于 2、3、4 长度总和
  2. 大端 4 个字节整数,等于 3 的长度
  3. 使用 json 序列化数据
  4. 应用自定义二进制序列化数据
  • Header 格式
    {"code": 0,"language": "JAVA","version": 0,"opaque": 0,"flag": 1,"remark": "hello, I am respponse /127.0.0.1:27603","extFields": {"count": "0","messageTitle": "HelloMessageTitle"}
    }
    
Header 字段名类型RequestResponse
code整数请求操作代码,请求接收方
根据不同的代码做不同的操作
应答结果代码,0 表示成功,
非 0 表示各种错误代码
language字符串请求发起方实现语言,默认JAVA应答接收方实现语言
version整数请求发起方程序版本应答接收方程序版本
opaque整数请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用应答方不做修改,直接返回
flag整数通信局的标志位通信局的标志位
remark字符串传输自定义文本信息错误详细描述信息
extFieldsHashMap<String,String>请求自定义字段应答自定义字段

2 )心跳处理

  • 通信组件本身不处理心跳,由上局进行心跳处理

3 )连接复用

  • 同一个网络连接,客户端多个线程可以同时发送请求,应答响应通过 header 中的 opaque 字段来标识

4 )超时连接

  • 如果某个连接超过特定时间没有活动(无读写事件),则自动关闭此连接
  • 并通知上层业务,清除连接对应的注册信息

RocketMQ 服务发现(Name Server)

  • Name Server 是专为 RocketMQ 设计的轻量级名称服务,代码小于 1000 行
  • 具有简单、可集群横向扩展、无状态等特点
  • 将要支持的主备自动切换功能会强依赖 Name Server
http://www.lryc.cn/news/490136.html

相关文章:

  • linux ubuntu的脚本知
  • HTTP有哪些风险?是怎么解决的?
  • 3.12MayBeSomeLinearAlgebra
  • 学习日志015--python单链表
  • 如何在Windows右键新建菜单中添加自定义项
  • Spring Boot 3.0废弃了JavaEE,改用了Jakarta EE
  • pdf文档动态插入文字水印,45度角,旋转倾斜,位于文档中央,多行水印可插入中文
  • [ 渗透测试面试篇-2 ] 针对大规模资产的攻击思路
  • 深入解析 Web 应用中的 CHIPS(Partitioned Cookie Attribute)
  • 从搭建uni-app+vue3工程开始
  • 归并排序与逆序对问题(C语言版)
  • 网络爬虫总结与未来方向
  • C++ 核心数据结构:Stack 与 Queue 类深度解析
  • Python枚举类详解:用enum模块高效管理常量数据
  • 企业OA管理系统:Spring Boot技术深度探索
  • 汽车免拆诊断案例 | 2012款路虎揽胜运动版柴油车加速无力
  • uniapp接入高德地图
  • (UI自动化测试)web自动化测试
  • 【es6进阶】如何使用Proxy实现自己的观察者模式
  • 住宅IP怎么在指纹浏览器设置运营矩阵账号
  • 表格数据处理中大语言模型的微调优化策略研究
  • CentOS7 如何查看kafka topic中的数据
  • VRRP实现出口网关设备冗余备份
  • 超详细:Redis分布式锁
  • Vue与React的Suspense组件对比
  • Spring框架深度剖析:特性、安全与优化
  • 硬盘文件误删:全面解析、恢复方案与预防策略
  • tcpdump抓包 wireShark
  • Android system_server进程
  • Vue3+element-plus 实现中英文切换(Vue-i18n组件的使用)