当前位置: 首页 > news >正文

springcloud RocketMQ 客户端是怎么走到消费业务逻辑的 - debug step by step

springcloud RocketMQ ,一个mq消息发送后,客户端是怎么一步步拿到消息去消费的?我们要从代码层面探究这个问题。

找的流程图,有待考究。
在这里插入图片描述

以下我们开始debug:


拉取数据的线程:
PullMessageService.java 本质是一个线程类

public class PullMessageService extends ServiceThread {private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();// ..
}

执行逻辑,一直循环拿取阻塞队列的内容,阻塞队列的内容由负载均衡服务提供。(阻塞队列中保存了目前客户端占有的 brokder - queue 信息)
在这里插入图片描述
然后进入 DefaultMQPushConsumerImpl.java 的 pullMessage(关键)
这里面有个关键的方法,this.pullAPIWrapper.pullKernelImpl(…) 这里传入了成功回调 pullCallback。
在这里插入图片描述
一直执行到 pullMessageAsync 是异步拉取消息,成功后会执行回调。
这里是成功消费后的回调。
成功后的回调逻辑:
在这里插入图片描述

ConsumeMessageConcurrentlyService.java 的 submitConsumeRequest 方法,将任务下发给消费者线程池 consumeExecutor (ThreadPoolExecutor 类型)去执行。(日志显示就是这里执行的消费业务)
在这里插入图片描述
~~
ok,我们看看开启的这个线程做了什么。
首先,单独一个线程是无法debug跨线程的,所以我们继续在 ConsumeMessageConcurrentlyService.ConsumeRequest 消费者请求线程中debug run方法,看看是怎么执行到我们的业务逻辑的。
发现是 监听器 listener 的消费逻辑
在这里插入图片描述
这个 listener 是一个接口,而且这个接口没有找到代码impl,也就是可能是匿名的视线
我们debug直接跳到了 RocketMQInboundChannelAdapter.java 的监听器,当时就是从这里把监听器注册进来的。
在这里插入图片描述
匿名方法执行了 RocketMQInboundChannelAdapter.this.consumeMessage
在这里插入图片描述
执行了一段 retry 逻辑(spring的重试框架),里面执行了发送消息逻辑。
在这里插入图片描述
发现底层用的是 spring 的 integration 消息通信框架!
debug进去send逻辑,会发送到一个 channel 中去
2
channel 里就有我们的处理方法的代理对象,是转发 dispatcher 的目标处理器 handlers 之一。
在这里插入图片描述
后面不出所料,就是通过反射去执行这个方法。
在这里插入图片描述
然后就跑到了我们的逻辑:
在这里插入图片描述

创作不易,希望点赞、收藏、关注支持~

http://www.lryc.cn/news/407381.html

相关文章:

  • GPT-4o mini小型模型具备卓越的文本智能和多模态推理能力
  • Milvus 向量数据库进阶系列丨部署形态选型
  • 【React】详解受控表单绑定
  • 使用puma部署ruby on rails的记录
  • 如何在Linux上使用Ansible自动化部署
  • scrapy爬取城市天气数据
  • 一天搞定React(5)——ReactRouter(下)【已完结】
  • 微信小程序之计算器
  • 【logstash】logstash使用多个子配置文件
  • 暴风骑士S9电摩上市,定义青少年骑行安全新标准
  • spring security如何适配盐存在数据库中的密码
  • Go语言编程 学习笔记整理 第2章 顺序编程 后半部分
  • 美团后端二面
  • 学懂C语言(十六):对C语言作用域规则 局部变量、全局变量的认识
  • 关于TS(typescript)的理论知识
  • 【OpenCV C++20 学习笔记】基本图像容器——Mat
  • 枚举单例是怎么保证线程安全和防止反射的
  • 传知代码-智慧医疗:纹理特征VS卷积特征(论文复现)
  • 数据结构中的八大金刚--------八大排序算法
  • ACC2.【C语言】经验积累 栈区简单剖析
  • c# 索引器
  • 低代码如何加速数字化转型
  • Pytest进阶之fixture的使用(超详细)
  • GitHub 详解教程
  • 边界网关IPSEC VPN实验
  • 力扣高频SQL 50题(基础版)第六题
  • 在一个事物方法中开启新事物,完成对数据库的修改
  • ffmpeg的vignetting filter
  • 商场导航系统:从电子地图到AR导航,提升顾客体验与运营效率的智能解决方案
  • vue3中父子组件的双向绑定defineModel详细使用方法