当前位置: 首页 > news >正文

Java Stream 使用 Fork/Join框架的分治任务模型

AbstractTask 类

AbstractTask 的核心作用是:定义一个通用的、基于 Fork/Join 框架的分治任务模型。它封装了任务分裂、子任务管理和结果传递的通用逻辑,使得子类可以专注于实现具体的计算任务,而无需关心并行的底层细节。


 

// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractTask<P_IN, P_OUT, R,K extends AbstractTask<P_IN, P_OUT, R, K>>extends CountedCompleter<R> {
// ... existing code ...
  • extends CountedCompleter<R>: 这是整个并行机制的核心。它继承自 CountedCompleter,这是一种特殊的 ForkJoinTaskCountedCompleter 能够自动管理子任务的完成状态。当一个任务的所有子任务都执行完毕后,它的 onCompletion 方法会被自动回调,这非常适合处理分治算法中“合并”结果的阶段。
  • 泛型参数:
    • P_IN: 流管道的输入元素类型。
    • P_OUT: 流管道的输出元素类型。
    • R: 任务的中间结果或最终结果的类型。
    • K: 任务自身的类型,用于类型安全的父子、兄弟节点引用(一种典型的递归泛型模式)。

核心字段

  • helperPipelineHelper 对象,封装了从源到当前操作的所有中间操作。所有子任务共享同一个 helper
  • spliterator: 每个任务都关联一个 Spliterator,它描述了当前任务需要处理的数据源部分。
  • targetSize: 目标叶子任务大小。这是一个阈值,用于决定任务是应该继续分裂还是直接计算。
  • leftChildrightChild: 指向左右子任务的引用,用于构建任务树。
  • localResult: 存储当前任务计算出的本地结果。

compute() 方法

compute() 方法是 AbstractTask 的灵魂,它实现了通用的分治(Divide and Conquer)算法。

// ... existing code ...@Overridepublic void compute() {Spliterator<P_IN> rs = spliterator, ls; // right, left spliteratorslong sizeEstimate = rs.estimateSize();long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;// 1. 分裂循环 (Divide)while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {K leftChild, rightChild, taskToFork;task.leftChild  = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork(); // 异步执行一个子任务sizeEstimate = rs.estimateSize();}// 2. 计算叶子节点 (Conquer)task.setLocalResult(task.doLeaf());// 3. 完成task.tryComplete();}
// ... existing code ...
  1. 分裂循环 (Divide)while 循环是分裂阶段。

    • 只要当前任务的数据量(sizeEstimate)大于阈值(sizeThreshold),并且 spliterator 可以成功分裂 (trySplit()),循环就会继续。
    • 在循环内部,它通过 makeChild(一个抽象方法,由子类实现)创建左右两个子任务。
    • taskToFork.fork(): 将其中一个子任务提交到 ForkJoinPool 中异步执行。
    • 另一个子任务则由当前线程继续在 while 循环中处理(这是一种尾递归优化,避免了方法调用的深度堆栈)。
    • forkRight 变量用于交替 fork 左右子任务,以应对 Spliterator 可能产生的不均衡分裂。
  2. 计算叶子节点 (Conquer): 当任务小到无法再分裂时,循环结束。

    • task.doLeaf(): 调用 doLeaf() 这个抽象方法。这是子类需要实现的核心计算逻辑。比如,ReduceTask 会在这里执行归约操作,CollectorTask 会在这里把元素收集到 Builder 中。
    • task.setLocalResult(...): 将计算结果保存在 localResult 字段中。
  3. 完成 (Combine):

    • task.tryComplete(): 通知 CountedCompleter 框架,当前叶子任务已完成。这会触发父任务的待完成计数器减一。当计数器归零时,父任务的 onCompletion 方法会被调用,从而实现结果的合并。

抽象方法与子类职责

AbstractTask 定义了两个关键的抽象方法,将“做什么”与“怎么做”分离:

  • protected abstract K makeChild(Spliterator<P_IN> spliterator);子类必须告诉 AbstractTask 如何创建自己的同类实例。这使得分裂逻辑可以通用。
  • protected abstract R doLeaf();子类必须定义在叶子节点上执行的具体计算。这是任务的核心价值所在。

此外,子类通常会重写 onCompletion(CountedCompleter<?> caller) 方法,以定义如何将子任务的结果合并到父任务中。

AbstractShortCircuitTask

AbstractShortCircuitTask 的核心目的是为 并行流中的短路操作(Short-Circuiting Operations) 提供一个统一的、可复用的执行框架。

anyMatchallMatchnoneMatch,以及 findFirstfindAny 都属于短路操作。这些操作的共同特点是:一旦在并行处理的某个分支中找到了一个可以决定最终结果的元素,就需要一种机制来尽快地通知所有其他并行的任务“停止工作”,从而避免不必要的计算。AbstractShortCircuitTask 就是为了实现这个“通知并停止”的机制而设计的。

它继承自 AbstractTask,后者是所有 Stream 并行任务的基类,提供了任务拆分(trySplit)和子任务创建(makeChild)等基础功能。

核心设计思想与关键字段

AbstractShortCircuitTask 的设计精髓在于它如何管理和同步并行任务之间的状态。

// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>extends AbstractTask<P_IN, P_OUT, R, K> {/*** The result for this computation; this is shared among all tasks and set* exactly once*/protected final AtomicReference<R> sharedResult;/*** Indicates whether this task has been canceled.* ...*/protected volatile boolean canceled;
// ... existing code ...
  • protected final AtomicReference<R> sharedResult; 这是整个设计的核心。它是一个被所有相关 Fork/Join 任务(包括根任务和所有子任务)共享的原子引用。

    • 共享:子任务在创建时会直接引用父任务的 sharedResult 实例,确保大家操作的是同一个对象。
    • 原子性:使用 AtomicReference 保证了多线程环境下对结果的写入是线程安全的。
    • 作用:它的初始值为 null。任何一个子任务一旦找到了可以“短路”的最终结果,就会尝试通过 compareAndSet(null, result) 将这个结果写入 sharedResult。由于 CAS 操作的原子性,只有第一个成功写入的线程会生效。一旦 sharedResult 不再是 null,就标志着整个大任务的最终结果已经产生。
  • protected volatile boolean canceled; 这是一个任务级的取消标志。volatile 关键字确保了其在多线程之间的可见性。它主要用于实现有序短路操作,比如 findFirst

compute() - 任务执行与短路检查

compute() 方法是 Fork/Join 任务的执行入口。AbstractShortCircuitTask 重写了这个方法,在标准的任务拆分逻辑中加入了短路检查。

// ... existing code ...@Overridepublic void compute() {// ... (任务拆分准备) ...AtomicReference<R> sr = sharedResult;R result;while ((result = sr.get()) == null) { // 核心检查点if (task.taskCanceled()) { // 检查任务是否被取消result = task.getEmptyResult();break;}// ... (标准的任务拆分和执行逻辑) ...// 如果当前任务块足够小,就执行 doLeaf()if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {result = task.doLeaf();break;}// ... (否则,拆分成左右子任务并 fork 一个) ...}task.setLocalResult(result);task.tryComplete();}protected boolean taskCanceled() {boolean cancel = canceled;if (!cancel) {for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())cancel = parent.canceled;}return cancel;}
// ... existing code ...

compute 方法的核心是一个 while 循环,其循环条件 (result = sr.get()) == null 是短路机制的关键。在每次进行任务拆分或执行叶子任务(doLeaf)之前,它都会检查共享的 sharedResult。如果发现 sharedResult 已经被其他任务写入了值,循环就会立即终止,当前任务也就不会再进行后续的计算,从而实现了短路。

shortCircuit(R result) - 设置短路结果

当一个叶子任务(在 doLeaf() 方法中)计算出了一个可以决定全局结果的值时,它会调用这个方法。

// ... existing code ...protected void shortCircuit(R result) {if (result != null)sharedResult.compareAndSet(null, result);}
// ... existing code ...

这个方法非常简单,就是尝试用 CAS 操作将结果写入 sharedResult。第一个成功的任务会“胜出”,后续所有调用 shortCircuit 的任务都会因为 sharedResult 不再是 null 而失败,这保证了结果只会被设置一次。

其它方法

getEmptyResult() - 获取默认结果

这是一个抽象方法,需要子类去实现。它定义了在没有发生短路,并且所有元素都处理完毕后,应该返回的默认结果。

  • 对于 anyMatch,如果遍历完都没找到匹配项,默认结果是 false
  • 对于 allMatch,如果遍历完所有项都匹配,默认结果是 true
  • 对于 findFirst,如果流为空,默认结果是 Optional.empty()

cancel() 和 cancelLaterNodes() - 有序操作的短路

对于像 findFirst 这样的有序操作,仅仅找到一个结果是不够的。我们必须确保找到的是 遭遇顺序(encounter order) 中最靠前的那一个。

cancelLaterNodes() 方法就是为此设计的。当一个任务(比如处理流中 0-100 号元素的任务)找到了一个结果后,它会调用 cancelLaterNodes()。这个方法会沿着任务树向上回溯,并把所有在它“右边”的兄弟任务以及父节点的右兄弟任务的 canceled 标志都设置为 true。这些被取消的任务在 compute 循环的 taskCanceled() 检查点就会提前退出。

这确保了只有遭遇顺序最靠前的任务产生的结果才会被接受,所有处理更后面元素的任务都会被取消。

子类如何使用

像我们之前分析的 MatchTask 和 FindTask 都会继承 AbstractShortCircuitTask。它们需要:

  1. 提供 getEmptyResult() 的具体实现。
  2. 实现 doLeaf() 方法。在这个方法里,它们会处理一小块数据,如果发现了短路条件,就调用 shortCircuit(result)

总结

AbstractShortCircuitTask 是 Java Stream 并行短路操作的基石。它通过一个所有任务共享的 AtomicReference (sharedResult) 作为通信和同步的媒介,构建了一个优雅而高效的并行短路框架。

  • 核心机制:任务在执行前检查 sharedResult,如果已有结果则提前退出。
  • 结果写入:叶子任务通过 shortCircuit() 方法以原子方式写入第一个有效结果。
  • 有序性支持:通过 cancel() 和 cancelLaterNodes() 机制,为 findFirst 等有序操作提供了额外的取消逻辑,保证了结果的正确性。

这个类的设计充分体现了在并行计算中,如何通过共享状态和原子操作来协调多个并发任务,以最高效率达成共同的目标。

http://www.lryc.cn/news/615490.html

相关文章:

  • Windows 安装 Xinference 速记
  • CPU缓存(CPU Cache)和TLB(Translation Lookaside Buffer)缓存现代计算机体系结构中用于提高性能的关键技术
  • 【线性代数】线性方程组与矩阵——(2)矩阵与线性方程组的解
  • 计算机网络:深入了解CIDR地址块如何利用VLSM进行子网划分的过程
  • 前端视角下关于 WebSocket 的简单理解
  • 如何在 Ubuntu 24.04 LTS Linux 上安装 Azure Data Studio
  • 【排序算法】④堆排序
  • 基于STM32H5的非循环GPDMA链表使用
  • LangChain-Unstructured 基础使用:PDF 与 Markdown 处理解析
  • 基于IPD体系的研发项目范围管理
  • 【网络与爬虫 52】Scrapyd-k8s集群化爬虫部署:Kubernetes原生分布式爬虫管理平台实战指南
  • 一个app项目周期是多久?
  • Java异常:认识异常、异常的作用、自定义异常
  • 世界时(Universal Time, UT)的定义与详解
  • 小学数学训练闭环:出题、作答、批改一体化方案实践
  • [Oracle] MAX()和MIN()函数
  • 【Python 高频 API 速学 ④】
  • LintCode第547题-两数组的交集
  • 腾讯COS云存储入门
  • 浅尝AI辅助C转Verilog方法
  • 新手小白使用jQuery在实际开发中常用到的经验
  • 第二十天:余数相同问题
  • 《Resolving tissue complexity by multimodal spatial omics modeling with MISO》
  • 【面试场景题】微博热点新闻系统设计方案
  • day18 - CSS函数
  • nginx高性能web服务器
  • 基于Prometheus、Grafana、Loki与Tempo的统一监控平台故障排查与解决方案
  • java组件安全vulhub靶场
  • [激光原理与应用-206]:光学器件 - SESAM - 基本结构与工作原理
  • 通用AGI到来,记忆仍需要一点旧颜色