当前位置: 首页 > news >正文

Apache Ignite 核心组件:GridClosureProcessor解析

这是一个 Apache Ignite 中非常核心的组件 —— GridClosureProcessor,它是 分布式闭包(Closure)执行的调度中枢,负责在集群节点上异步执行用户提交的任务(如 RunnableClosure)。

我们来逐层深入理解它的设计思想、关键机制和代码逻辑。


🧱 一、类概览:GridClosureProcessor

public class GridClosureProcessor extends GridProcessorAdapter
  • 职责:处理所有基于闭包(函数式)的远程执行请求
  • 常见用途:
    • compute().run(Runnable)
    • compute().call(Closure)
    • compute().broadcast(Closure)
    • cache().affinity().run(...)
  • 它是 ComputeTask 的底层支撑模块

🔩 二、关键字段解析

字段类型作用
poolsPoolProcessor线程池管理器,用于获取执行任务的线程池
busyLockGridSpinReadWriteLock控制组件在 停止期间不接受新任务
stoppingboolean标记当前处理器是否正在停止

⚠️ 这三个字段共同实现了 “优雅关闭” 的核心逻辑。


🔒 三、busyLock:优雅关闭的关键机制

1. 什么是 GridSpinReadWriteLock

  • Ignite 自定义的 自旋读写锁
  • 特点:
    • 读锁可重入、允许多个线程同时持有
    • 写锁独占,用于“停止”阶段
    • 使用 自旋 + sleep 避免线程频繁阻塞唤醒

2. 读锁(readLock()):

  • 所有任务提交方法(runAsync, callAsync, broadcast)都先获取读锁
  • 表示:“我正在使用这个处理器”
  • 允许多个线程并发提交任务

3. 写锁(tryWriteLock(...)):

  • onKernalStop(...) 中使用
  • 目的:阻止任何新任务提交,并标记为“停止中”

🛑 四、onKernalStop(...):优雅关闭流程

@Override
public void onKernalStop(boolean cancel) {boolean interrupted = false;while (true) {try {if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))break;elseThread.sleep(200);}catch (InterruptedException ignore) {interrupted = true;}}try {if (interrupted)Thread.currentThread().interrupt();stopping = true; // 标记为停止状态}finally {busyLock.writeUnlock();}
}

🔍 流程详解:

  1. 尝试获取写锁

    • tryWriteLock(200ms):尝试在 200ms 内获取写锁
    • 如果有线程持有读锁(即正在提交任务),则失败
    • 失败后 Thread.sleep(200),然后重试
  2. 为什么是“Busy Wait”?

    • 注解 @SuppressWarnings("BusyWait") 表示这是有意为之的忙等待
    • 目的:尽快完成关闭,避免长时间阻塞
    • 每 200ms 尝试一次,不会过度消耗 CPU
  3. 处理中断

    • 如果等待期间被中断,记录 interrupted = true
    • 最后恢复中断状态(线程安全最佳实践)
  4. 设置 stopping = true

    • 获取写锁后,设置标志位
    • 之后所有 runAsync 等调用都会被拒绝
  5. 释放写锁

    • 即使发生异常,也确保释放锁

✅ 这是一个典型的 “关闭守卫”模式:先阻止新请求,再清理资源。


🚀 五、任务提交方法分析

所有任务提交方法都遵循统一模式:

busyLock.readLock();
try {if (stopping) reject();// 提交任务
} finally {busyLock.readUnlock();
}

我们以 runAsync(...) 为例:

runAsync(...):运行一批 Runnable

public ComputeTaskInternalFuture<?> runAsync(...) {assert mode != null;assert !F.isEmpty(jobs);busyLock.readLock(); // 获取读锁try {if (stopping) {return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));}if (F.isEmpty(nodes))return finishedFuture(U.emptyTopologyException());ctx.task().setThreadContext(TC_SUBGRID, nodes);return ctx.task().execute(new T1(mode, jobs), null, sys, execName);}finally {busyLock.readUnlock(); // 释放读锁}
}
关键点:
  • stopping 检查:如果正在停止,直接返回失败 future
  • nodes 检查:拓扑为空则返回空拓扑异常
  • ctx.task().execute(...):交给 TaskProcessor 执行(T1 是一个内部任务类型)
  • 使用 sys 参数决定使用 系统线程池 还是 公共线程池

callAsync(...):远程调用 Closure

public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
  • 执行一个带返回值的函数(Closure<T,R>
  • 返回 ComputeTaskInternalFuture<R>,可获取结果

broadcast(...):广播到所有节点

public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
  • nodes 列表中的每个节点上执行 job
  • 返回一个 Future<Collection<R>>,包含所有节点的返回值

affinityRun(...):基于数据亲和性执行

public ComputeTaskInternalFuture<?> affinityRun(...)
  • 关键用途:将任务发送到 特定缓存分区(partition)的主节点
  • 流程:
    1. 获取当前拓扑版本 readyAffinityVersion()
    2. 使用 ctx.affinity().mapPartitionToNode(...) 找到负责该分区的节点
    3. 只在那个节点上执行任务
  • 优势:本地化执行,避免数据移动,性能极高

💡 这是 Ignite 实现“移动计算而非数据”的核心机制之一。


🧩 六、T1, T8, T11, T4 是什么?

这些是 内部任务类(定义在 GridTaskInternalFuture 或内部类中),用于包装用户任务:

任务类包装的任务类型
T1GridClosureCallMode + Collection<Runnable>
T8IgniteClosure<T,R>
T11Broadcast 任务
T4Affinity 任务

它们都继承自 ComputeTaskAdapter,由 TaskProcessor 调度执行。


🎯 七、整体架构图(简化)

+---------------------+
|  User Code          |
|  compute().run(...) |
+----------+----------+|v
+---------------------+
| GridClosureProcessor|
| - busyLock          |
| - stopping          |
+----------+----------+|v
+---------------------+
|  TaskProcessor      |
|  execute(Task)      |
+----------+----------+|v
+---------------------+
|  PoolProcessor      |
|  系统/公共线程池     |
+---------------------+

✅ 八、设计亮点总结

特性说明
读写锁控制关闭读锁允许多任务并发提交,写锁确保关闭时原子性
优雅拒绝新任务stopping 标志 + finishedFuture 快速失败
支持多种执行模式单节点、广播、亲和性执行
与 Task 子系统集成复用 TaskProcessor 的调度能力
线程安全所有提交路径都受锁保护
可观测性调试日志、异常信息清晰

📌 九、一句话总结

GridClosureProcessor 是 Ignite 的 分布式任务调度入口,它通过 读写锁机制 实现了 高并发提交 + 优雅关闭,并支持 普通执行、广播、数据亲和性执行 等多种模式,是 Compute 子系统的核心引擎。


💡 十、你可以借鉴的设计模式

1. 关闭守卫模式(Shutdown Guard)

private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;public void submit(Runnable task) {shutdownLock.readLock().lock();try {if (shuttingDown) throw new RejectedExecutionException();// 执行任务} finally {shutdownLock.readLock().unlock();}
}public void shutdown() {shutdownLock.writeLock().lock();try {shuttingDown = true;} finally {shutdownLock.writeLock().unlock();}
}

2. 快速失败(Fail-Fast)

  • 不让任务进入队列,而是在入口就拒绝
  • 返回一个“已完成的失败 Future”,避免资源浪费

🏁 结语

GridClosureProcessor 虽然代码量不大,但它体现了分布式系统中 资源管理、并发控制、生命周期管理 的最佳实践。理解它,有助于你设计自己的 高可用、可扩展的任务调度系统

http://www.lryc.cn/news/617184.html

相关文章:

  • pom.xml父子模块配置
  • 【Maven】01 - 入门篇
  • Maven 的 module 管理
  • 基于Spring Data Elasticsearch的分布式全文检索与集群性能优化实践指南
  • Maven 报错:Blocked mirror for repositories【完美解决】
  • 直接编辑pdf文件教程
  • SpringBoot 自动配置核心机制(面试高频考点)
  • wpf问题记录
  • 【2025最新版】PDF24 Creator,PDF编辑,合并分割,格式转换全能工具箱,本地离线版本,完全免费!
  • 【Maven】02 - 进阶篇
  • 《深度剖析前端框架中错误边界:异常处理的基石与进阶》
  • 华为虚拟防火墙配置案例详解
  • 基于SpringBoot+Uniapp的血压监控小程序(Echarts图形化分析)
  • 华为watch5心率变异性测量法的底层逻辑
  • Django ORM查询技巧全解析
  • 41.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--扩展功能--集成网关--网关集成Swagger
  • Spring MVC 注解参数接收详解:@RequestBody、@PathVariable 等区别与使用场景
  • kafka 中的Broker 是什么?它在集群中起什么作用?
  • [Oracle] UNPIVOT 列转行
  • CodeBuddy IDE完全食用手册:从安装到生产力爆发的技术流解剖
  • 视频前处理技术全解析:从基础到前沿
  • 【安全发布】微软2025年07月漏洞通告
  • AI大模型:(二)5.1 文生视频(Text-to-Video)模型发展史
  • 从ELF到进程间通信:剖析Linux程序的加载与交互机制
  • 音视频学习(五十三):音频重采样
  • 动态创建可变对象:Python类工厂函数深度解析
  • Vue3从入门到精通:3.1 性能优化策略深度解析
  • Unity跨平台性能优化全攻略:PC与安卓端深度优化指南 - CPU、GPU、内存优化 实战案例C#
  • docker集群
  • 在Linux中部署tomcat