当前位置: 首页 > news >正文

Kafka服务端NIO操作原理解析(二)

Kafka系列文章

基于Kafka2.1解读Producer原理
基于Kafka2.1解读Consumer原理
Kafka服务端NIO操作原理解析(一)


文章目录

  • Kafka系列文章
  • 前言
  • 一、基本认知
  • 二、Acceptor的主体流程
    • 2.1 run方法源码
    • 2.2 acceptNewConnections方法源码
    • 2.3 主体逻辑流程示意图
  • 三、Processor的主体流程
    • 3.1 run方法源码
    • 3.2 主体逻辑流程示意图
      • 3.2.1 configureNewConnections
      • 3.2.2 processNewResponses
      • 3.2.3 poll
        • pollSelectionKeys
      • 3.2.4 processCompletedReceives
      • 3.2.5 processCompletedSends
  • 四、问题
  • 总结


前言

废话不多说,继续上一篇文章,我们继续基于Kafka3.7解读服务端的nio原理


Kafka服务端IO类关系图

一、基本认知

可以看到Acceptor和Processor都是线程,所以实际Kafka服务端在启动(执行startup)方法之后,会基于一个Acceptor多个Processor的模式启动,并把这多个线程执行起来。

二、Acceptor的主体流程

acceptor的类图示意图
对于Acceptor来说,主要就是通过selector监听accept事件,然后选择一个processor来进行后续操作

2.1 run方法源码

  /*** Accept loop that checks for new connection attempts*/override def run(): Unit = {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)try {while (shouldRun.get()) {try {acceptNewConnections()closeThrottledConnections()}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket, selector, and any throttled sockets.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket, this))throttledSockets.clear()}}

可以看到主要流程在acceptNewConnections中,下面我们看看acceptNewConnections代码

2.2 acceptNewConnections方法源码

  /*** Listen for new connections and assign accepted connections to processors using round-robin.*/private def acceptNewConnections(): Unit = {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && shouldRun.get()) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {accept(key).foreach { socketChannel =>// Assign the channel to the next processor (using round-robin) to which the// channel can be added without blocking. If newConnections queue is full on// all processors, block until the last one is able to accept a connection.var retriesLeft = synchronized(processors.length)var processor: Processor = nulldo {retriesLeft -= 1processor = synchronized {// adjust the index (if necessary) and retrieve the processor atomically for// correct behaviour in case the number of processors is reduced dynamicallycurrentProcessorIndex = currentProcessorIndex % processors.lengthprocessors(currentProcessorIndex)}currentProcessorIndex += 1} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))}} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")} catch {case e: Throwable => error("Error while accepting connection", e)}}}}

2.3 主体逻辑流程示意图

Acceptor的主体流程示意图
注意看到Processor的accept操作做了两件事

1. 将socketChannel放到了自身的newConnections里
2. wakeup一下自身的selector

备注:newConnections是线程安全的ArrayBlockingQueue

三、Processor的主体流程

Processor的类图关系示意图

每个Processor是会处理多个socketChannel的:

channels变量维护多个KafkaChannel
explicitlyMutedChannels:维护的是被mute掉的channels
completedReceives:维护单次poll操作从每个channel读取到的数据
completedSends:维护的是单次poll操作已经通过nio写出去的数据

explicitly:强调 “主动、明确地”,即用户通过手动操作(而非系统默认或其他自动设置)进行的行为
注意:completedReceives和completedSends单次执行poll操作的数据

3.1 run方法源码

override def run(): Unit = {try {while (shouldRun.get()) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()poll()processCompletedReceives()processCompletedSends()processDisconnected()closeExcessConnections()} catch {// We catch all the throwables here to prevent the processor thread from exiting. We do this because// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would// be either associated with a specific socket channel or a bad request. These exceptions are caught and// processed by the individual methods above which close the failing channel and continue processing other// channels. So this catch block should only ever see ControlThrowables.case e: Throwable => processException("Processor got uncaught exception.", e)}}} finally {debug(s"Closing selector - processor $id")CoreUtils.swallow(closeAll(), this, Level.ERROR)}}

3.2 主体逻辑流程示意图

processor主体流程示意图
可以看到我这个主体流程示意图并没有把processDisconnected()和closeExcessConnections()方法放进来,因为这两个方法对于我们理解Kafka的nio不太重要,所以暂时忽略

3.2.1 configureNewConnections

1. 从自身的newConnections里获取socketChannel
2. 将socketChannel封装成KafkaChannel,并在selector上注册读事件

3.2.2 processNewResponses

一句话总结:将需要发送出去的数据拷贝到KafkaChannel上,也就是做真正发送的准备操作

1. 从自身responseQueue里poll一个可发送的response
2. 将response拷贝到对应KafkaChannel的send对象上
3. 在selector上注册写事件
4. 同时将response放到inflightRespones里:为后续执行回调函数使用

当然,responseQueue也是线程安全的,不过是LinkedBlockingDeque

3.2.3 poll

这个方法是服务端真正执行IO操作的逻辑,包括读和写

1. 清掉completedReceives和completedSends
2. 执行poll操作
3. 处理poll到的selectionKeys
pollSelectionKeys

attemptRead

1. 从socketChannel读取数据
2. 将读取到的数据存到selector的completedReceives里
3. 把当前KafkaChannel里的receive给清掉

attemptWrite
注意:此处KafkaChannel上的send数据来自于「3.2.2 processNewResponses」

4. 将KafkaChannel上send数据通过KafkaChannel写出去
5. 将该数据append到selector的completedSends上
6. 把当前KafkaChannel里的send给清掉

3.2.4 processCompletedReceives

一言以蔽之:对上一步「3.2.3 poll」读取到的数据进行处理

1. 将读取到数据封装成Request放到requestChannel的requestQueue里
2. 将该数据的Channel给禁言:mute
3. 清掉整个selector的completedReceives

mute操作主要两件事:①删除掉该channel在selector上注册的读事件②将该channel放到explicitlyMutedChannels里

3.2.5 processCompletedSends

一言以蔽之:对「3.2.3 poll」操作里发出去的response执行回调

1. 从inflightRespones里读取response,执行该response的回调方法
2. 如果该channel被禁言了,解除禁言
3. 清掉整个selector的completedSends

解除禁言操作主要两件事:①该channel在selector上注册读事件②将该channel从explicitlyMutedChannels里remove掉

四、问题

看完processor的主要操作,咱们就冒出两个问题:

  1. processor的responseQueue里的数据是谁写入的?
  2. 我们写到requestChannel#requestQueue里的数据谁去处理了?

这两个问题就是Kafka server的核心计算逻辑了,而本文着重讲了Kafka server的核心IO逻辑


总结

Acceptor和Processor两大组件主体流程示意图

在上一篇简单概括加原生nio的引导之后,本文详细介绍了Kafka server端Acceptor和Processor是如何工作来处理读入和写出的逻辑。
后续咱们就要基于咱们的问题来介绍Kafka server的计算逻辑了:读进来的数据怎么处理,写出去的response是怎么来的~

http://www.lryc.cn/news/615326.html

相关文章:

  • Arm Development Studio 安全通告:CVE-2025-7427
  • 人脸情绪检测数据集-9,400 张图片 智能客服系统 在线教育平台 心理健康监测 人机交互优化 市场研究与广告 安全监控系统
  • 【面试题】cookie和session 的区别
  • 【26】C#实战篇—— 多个线程函数对同一个 Excel 文件进行写操作引起的文件冲突问题,解决方法
  • Playwright C# 自动登录并上传 Excel 文件 的可运行示例
  • Irix HDR Pro:专业级 HDR 图像处理软件
  • Docker部署whisper转写模型
  • Java中Lambda表达式的常见用法和解析:从入门到实战
  • C/C++基础详解(二)
  • 【51单片机4按键启动停止向上向下流水灯】2022-10-26
  • 本文章分享一个本地录音和实时传输录音给app的功能(杰理)
  • 【c++】探秘Loop机制:C++中优雅的双向数据交互模式
  • Ubuntu下安全彻底删除后端服务完整指南
  • 网络原理-初识
  • PNPM总结
  • QT第一讲- Qt初探
  • Microsoft Office Visio(流程图)学习笔记
  • 使用SymPy lambdify处理齐次矩阵的高效向量化计算
  • 动手学深度学习(pytorch版):第二章节——预备知识(1)——数据操作
  • 2025华数杯数学建模C题:可调控生物节律LED光源全解析
  • 理解协议最大传输单元(MTU)和TCP 最大报文段长度(MSS)
  • 自动生成视频的AI大模型高效创作指南
  • 掌握数据可视化:全局配置项详解
  • Nginx 反向代理与负载均衡架构
  • Redhat Linux 9.6 配置本地 yum 源
  • qt文件操作与qss基础
  • 2025彩虹易支付官方正版无删减完整版源码
  • B.10.01.5-电商系统的设计模式应用实战
  • 【Canvas与旗帜】圆角蓝底大黄白星十一红白带旗
  • Node.js特训专栏-实战进阶:22. Docker容器化部署