当前位置: 首页 > news >正文

【RocketMQ每日一问】RocketMQ SQL92过滤用法以及原理?

1.生产端

public class SQLProducer {public static int count = 10;public static String topic = "xiao-zou-topic";public static void main(String[] args) {DefaultMQProducer producer = MQUtils.createLocalProducer();IntStream.range(0, count).forEach(i -> {Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));try {if (i % 2 == 0) {message.putUserProperty("gray", "dev1");}SendResult sendResult = producer.send(message);DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));}catch (Exception e) {throw new RuntimeException(e);}});producer.shutdown();}
}

2.消费端

public class SQLConsumer {public static String GID = "xiao-zou-gid";public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);String sql = "gray is not null and gray = 'dev1'";consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/**  Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}

3.语法规则

4.原理

  1. 当消息到达 Broker 时,Broker 会将消息与对应的订阅关系进行匹配。

  2. 如果该订阅关系包含 SQL92 表达式,则将该表达式传递给消息过滤器。

  3. 消息过滤器使用 Antlr4 解析器解析 SQL92 表达式,并将其转换为语法树。

  4. 一旦表达式被转换为语法树,过滤器就可以开始遍历语法树,并使用消息属性和自定义属性来匹配表达式中的条件。

  5. 如果消息属性和自定义属性匹配 SQL92 表达式中的条件,则过滤器将消息传递给消费者。

  6. 如果消息属性和自定义属性不匹配 SQL92 表达式中的条件,则过滤器将跳过该消息,并继续匹配其他消息。

http://www.lryc.cn/news/272441.html

相关文章:

  • Go语言中的包管理工具之Go Path的使用
  • cocos creator(2.4.7版本) webview 可以在上层添加UI控件
  • 2023 年四川省职业院校技能大赛“信息安全管理与评估”样题
  • ubuntu2204,mysql8.x安装
  • CG Magic分享云渲染和本地渲染之间的区别有什么?
  • 【算法与数据结构】763、LeetCode划分字母区间
  • 新火种AI|人形机器人敲响上市罗,首日市值高达390亿港元
  • SpringMVC框架
  • FreeRTOS——计数型信号量知识总结及实战
  • Linux下Docker Engine安装后的一些配置步骤
  • 【并发设计模式】聊聊Balking是如何实现以及具体原理
  • dubbo的一些问题思考
  • 盛最多水的容器(力扣11题)
  • .babky勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • 20240103-通过布局让自己的生活有有意义人生有价值
  • JDK17 - 开发者视角,从 JDK8 ~ JDK17 都增加了哪些新特性
  • 八股文打卡day11——计算机网络(11)
  • 在Android设备上设置和使用隧道代理HTTP
  • Paddle3D 2 雷达点云CenterPoint模型训练
  • RabbitMQ集群的简单说明
  • 支付宝沙箱支付-验签出错之编码集异常
  • 图像分割-漫水填充法 floodFill (C#)
  • 在pycharm中jupyter连接上了以后显示无此库,但是确实已经安装好了某个库,使用python可以跑,但是使用ipython就跑不了
  • C++多态性——(3)动态联编的实现——虚函数
  • docker部署mysql
  • python代码大全(持续更新)
  • C#学习笔记 - C#基础知识 - C#从入门到放弃 - C# 处理程序异常相关技术
  • [python]项目怎么使用第三方库
  • java每日一题——双色球系统(答案及编程思路)
  • java的mybatis