当前位置: 首页 > news >正文

RocketMQ系统性学习-RocketMQ原理分析之消费者的接收消息流程

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

消费者的接收消息流程

还是先把消费者接收消息的流程图贴出来,再细说代码流程:
在这里插入图片描述

首先先从消费者的业务调用出发

// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// ...
// 注册监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
// 启动消费者
consumer.start();

那么我们就从 consumer.start() 进入,看一下消费者的启动逻辑,该方法的核心代码也就是:

this.defaultMQPushConsumerImpl.start();

那么进入到这个 start 方法,这里进行了一些配置以及客户端的启动:

  1. 通过 checkConfig() 检查消费组的一些配置:名称是否符合规范、消费者的线程数、消费者的监听等等
  2. 之后再设置一些属性
  3. 通过 mQClientFactory.start() 启动客户端

那么我们进入到启动客户端这个逻辑,我们猜测这里 start 之后,可能就可以进行消息的拉取了,那么在 start 这个方法中,看到了有下边这一行:

this.pullMessageService.start();

这不正是拉取消息的服务吗?点进去之后,发现就是启动了一个线程,这个线程呢就是 this,那么我们点进去这个 start 方法是定义在 ServiceThread 类中,这个类并没有定义 run 方法,因此呢,这个 run 方法应该是定义在了子类 PullMessageService 类中,点进去找到 run 方法,可以看到在 run 方法中就会不停地去 messageRequestQueue 中拉取数据:

MessageRequest messageRequest = this.messageRequestQueue.take();

既然在这里拉取数据了,那么数据是什么时候放到 messageRequestQueue 中的呢?

只需要搜一下哪里调用到了 this.messageRequestQueue.put 就可以知道了,找到之后呢,我们在这一行打个断点,再去启动生产者,就可以知道整个调用链了,

在这里插入图片描述

那么根据栈调用情况呢,可以发现这一行是通过 RebalanceService 的 run 方法进入的,那么这个 RebalanceService 一定是在哪里作为一个线程被启动了

在这里插入图片描述

那么呢,我们之前说了在启动客户端的时候,调用 this.pullMessageService.start() 启动了这个线程,那么在下一行就启动了 rebalanceService 这个线程:

在这里插入图片描述

因此呢,就通过 debug 的方式找到了向 messageRequestQueue 中存放消息就是在 RebalanceService 这个线程中做的

http://www.lryc.cn/news/264389.html

相关文章:

  • butterfly蝴蝶分类
  • 计算机基础:网络基础
  • [原创][R语言]股票分析实战[3]:周级别涨幅趋势的相关性
  • MSVC编译 openssl windows 库
  • electron兼容统信UOS系统过程中的坑
  • Flink系列之:Apache Kafka SQL 连接器
  • 灰盒测试简要学习指南!
  • 【经典LeetCode算法题目专栏分类】【第7期】快慢指针与链表
  • springboot解决XSS存储型漏洞
  • I.MX6ULL_Linux_驱动篇(47)linux RTC驱动
  • 详解IBM企业架构框架模型CBM
  • 宝塔面板安装MySQL数据库并通过内网穿透工具实现公网远程访问
  • Elasticsearch 性能调优基础知识
  • 速盾网络:网络安全守护者
  • jmeter如何参数化?Jmeter参数化设置的5种方法
  • 01AVue入门(持续学习中)
  • js 深浅拷贝的区别和实现方法
  • 【jvm从入门到实战】(九) 垃圾回收(2)-垃圾回收器
  • C#基础——匿名函数和参数不固定的函数
  • PCL 点云匹配 4 之 (非线性迭代点云匹配)lM-ICP
  • MySQL_14.数据库高速缓冲区空间管理
  • leetcode 974. 和可被 K 整除的子数组(优质解法)
  • 【技术】MySQL 日期时间操作
  • 测试理论知识三:测试用例、测试策略
  • 【clickhouse】在CentOS中离线安装clickhouse
  • 微信商户号申请0.2费率
  • 基于单片机设计的电子指南针(LSM303DLH模块(三轴磁场 + 三轴加速度)
  • 深度学习 该用什么标准判断差异最小
  • 汽车制造厂设备故障预测与健康管理PHM
  • 如何通过宝塔面板搭建一个MySQL数据库服务并实现无公网ip远程访问?