当前位置: 首页 > news >正文

RocketMQ源码学习笔记:Producer启动流程

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
    • 1.1、创建MQClientInstance
      • 1.1.1、检查
      • 1.1.1、MQClientInstance的ID
    • 1.2、MQClientInstance.start()

1、Overview

在这里插入图片描述

这是发送信息的代码样例,

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}
}
producer.shutdown();

生产者启动最少需要两个信息,groupnameSrvAddr。启动的源码则是producer#start()中。

最终RocketMQ会创建MQClientInstance的实例,然后在调用MQClientInstance#start()完成生产者的启动。

1.1、创建MQClientInstance

1.1.1、检查

代码线索DefaultMQProducer#start() -> DefaultMQProducerImpl#start() -> DefaultMQProducerImpl#checkConfig()

创建MQClientInstance前做前置检查,主要是检查group的格式,并且不能和系统的group重命。

1.1.1、MQClientInstance的ID

MQClientInstanceMQClientManager进行管理。MQClientManager整个JVM中只有一个实例,其内部用ConcurrentMap<String, MQClientManager>管理着所有的MQClientInstance,其中的String可以看成是每个MQClientInstance的id,下面通过源码查看id是如何组成的。

代码线索DefaultMQProducer#start() -> DefaultMQProducerImpl#start() -> MQClientManager#getInstance()#getOrCreateMQClientInstance() -> ClientConfig#buildMQClientId()

public String buildMQClientId() {StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}if (enableStreamRequestType) {sb.append("@");sb.append(RequestType.STREAM);}return sb.toString();
}

很明显,每个MQClientInstance的ID主要是由IPinstanceNameunitName组成,其中instanceNameunitName都可以设置。所以如果我们想要创建多个MQClientInstance使用的话,可以设置不同的instanceNameunitName


1.2、MQClientInstance.start()

启动一些线程池,心跳服务。

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// NRC startthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
http://www.lryc.cn/news/388212.html

相关文章:

  • Node.js 和浏览器环境中都使用 WebSocket
  • css美化滚动条样式
  • 由浅入深,走进深度学习(补充篇:转置卷积和FCN)
  • Linux基础篇——目录结构
  • 星际编码:Swifter.Json,.NET宇宙中的数据处理新星
  • python 压缩数据
  • nacos在k8s上的集群安装实践
  • 数据结构—判断题
  • 树莓派挂载的移动硬盘badblocks坏道屏蔽,以这个为准
  • Unity开箱即用的UGUI面板的拖拽移动功能
  • 春秋云境:CVE-2022-25411[漏洞复现]
  • java基础知识点全集
  • 如何完成域名解析验证
  • 2024年6月个人工作生活总结
  • Json与Java类
  • 动手学深度学习(Pytorch版)代码实践 -计算机视觉-39实战Kaggle比赛:狗的品种识别(ImageNet Dogs)
  • 在Linux系统中挂载硬盘
  • 安卓短视频去水印v1.7 简洁好用
  • 【征服数据结构】:期末通关秘籍
  • GIT 基于master分支创建hotfix分支的操作
  • Vue-CLI脚手架与node.js安装
  • 自适应站长跑路单页网站源码
  • Java基础(判断和循环)
  • 51单片机第12步_使用stdio.h库函数仿真串口通讯
  • simulink-esp32开发foc电机
  • Python教程--基本技能
  • 干货分享:Spring中经常使用的工具类(提示开发效率)
  • 一文讲懂npm link
  • 观成科技:证券行业加密业务安全风险监测与防御技术研究
  • 使用Swoole开发高性能的Web爬虫