当前位置: 首页 > news >正文

Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

http://www.lryc.cn/news/528898.html

相关文章:

  • 网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。
  • 【go语言】结构体
  • Spring Boot是什么及其优点
  • 谷氨酸:大脑功能的多面手
  • SpringCloudGateWay和Sentinel结合做黑白名单来源控制
  • HTML新春烟花
  • 【Elasticsearch】中数据流需要配置索引模板吗?
  • Git进阶之旅:Git 配置信息 Config
  • buu-pwn1_sctf_2016-好久不见29
  • ES2021+新特性、常用函数
  • STM32——LCD
  • 【redis进阶】分布式锁
  • 园区管理系统如何提升企业核心竞争力与资产管理智能化水平
  • AI大模型开发原理篇-3:词向量和词嵌入
  • 高精度算法:高精度减法
  • Java创建项目准备工作
  • 基于STM32的智能宠物喂食器设计
  • 在线课堂小程序设计与实现(LW+源码+讲解)
  • 为AI聊天工具添加一个知识系统 之77 详细设计之18 正则表达式 之5
  • 【Elasticsearch】 索引模板 ignore_missing_component_templates
  • Github 2025-01-29 C开源项目日报 Top10
  • 文件上传2
  • Unity敌人逻辑笔记
  • 高级编码参数
  • DeepSeek-R1:通过强化学习激励大型语言模型(LLMs)的推理能力
  • leetcode——合并K个有序链表(java)
  • 【Valgrind】安装报错: 报错有未满足的依赖关系: libc6,libc6-dbg
  • vue3和vue2的区别有哪些差异点
  • 论文笔记(六十三)Understanding Diffusion Models: A Unified Perspective(六)(完结)
  • NPM 使用介绍