Java Stream 使用 Fork/Join框架的分治任务模型
AbstractTask
类
AbstractTask
的核心作用是:定义一个通用的、基于 Fork/Join 框架的分治任务模型。它封装了任务分裂、子任务管理和结果传递的通用逻辑,使得子类可以专注于实现具体的计算任务,而无需关心并行的底层细节。
// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractTask<P_IN, P_OUT, R,K extends AbstractTask<P_IN, P_OUT, R, K>>extends CountedCompleter<R> {
// ... existing code ...
extends CountedCompleter<R>
: 这是整个并行机制的核心。它继承自CountedCompleter
,这是一种特殊的ForkJoinTask
。CountedCompleter
能够自动管理子任务的完成状态。当一个任务的所有子任务都执行完毕后,它的onCompletion
方法会被自动回调,这非常适合处理分治算法中“合并”结果的阶段。- 泛型参数:
P_IN
: 流管道的输入元素类型。P_OUT
: 流管道的输出元素类型。R
: 任务的中间结果或最终结果的类型。K
: 任务自身的类型,用于类型安全的父子、兄弟节点引用(一种典型的递归泛型模式)。
核心字段
helper
:PipelineHelper
对象,封装了从源到当前操作的所有中间操作。所有子任务共享同一个helper
。spliterator
: 每个任务都关联一个Spliterator
,它描述了当前任务需要处理的数据源部分。targetSize
: 目标叶子任务大小。这是一个阈值,用于决定任务是应该继续分裂还是直接计算。leftChild
,rightChild
: 指向左右子任务的引用,用于构建任务树。localResult
: 存储当前任务计算出的本地结果。
compute()
方法
compute()
方法是 AbstractTask
的灵魂,它实现了通用的分治(Divide and Conquer)算法。
// ... existing code ...@Overridepublic void compute() {Spliterator<P_IN> rs = spliterator, ls; // right, left spliteratorslong sizeEstimate = rs.estimateSize();long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;// 1. 分裂循环 (Divide)while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {K leftChild, rightChild, taskToFork;task.leftChild = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork(); // 异步执行一个子任务sizeEstimate = rs.estimateSize();}// 2. 计算叶子节点 (Conquer)task.setLocalResult(task.doLeaf());// 3. 完成task.tryComplete();}
// ... existing code ...
分裂循环 (Divide):
while
循环是分裂阶段。- 只要当前任务的数据量(
sizeEstimate
)大于阈值(sizeThreshold
),并且spliterator
可以成功分裂 (trySplit()
),循环就会继续。 - 在循环内部,它通过
makeChild
(一个抽象方法,由子类实现)创建左右两个子任务。 taskToFork.fork()
: 将其中一个子任务提交到ForkJoinPool
中异步执行。- 另一个子任务则由当前线程继续在
while
循环中处理(这是一种尾递归优化,避免了方法调用的深度堆栈)。 forkRight
变量用于交替fork
左右子任务,以应对Spliterator
可能产生的不均衡分裂。
- 只要当前任务的数据量(
计算叶子节点 (Conquer): 当任务小到无法再分裂时,循环结束。
task.doLeaf()
: 调用doLeaf()
这个抽象方法。这是子类需要实现的核心计算逻辑。比如,ReduceTask
会在这里执行归约操作,CollectorTask
会在这里把元素收集到Builder
中。task.setLocalResult(...)
: 将计算结果保存在localResult
字段中。
完成 (Combine):
task.tryComplete()
: 通知CountedCompleter
框架,当前叶子任务已完成。这会触发父任务的待完成计数器减一。当计数器归零时,父任务的onCompletion
方法会被调用,从而实现结果的合并。
抽象方法与子类职责
AbstractTask
定义了两个关键的抽象方法,将“做什么”与“怎么做”分离:
protected abstract K makeChild(Spliterator<P_IN> spliterator);
: 子类必须告诉AbstractTask
如何创建自己的同类实例。这使得分裂逻辑可以通用。protected abstract R doLeaf();
: 子类必须定义在叶子节点上执行的具体计算。这是任务的核心价值所在。
此外,子类通常会重写 onCompletion(CountedCompleter<?> caller)
方法,以定义如何将子任务的结果合并到父任务中。
AbstractShortCircuitTask
AbstractShortCircuitTask
的核心目的是为 并行流中的短路操作(Short-Circuiting Operations) 提供一个统一的、可复用的执行框架。
anyMatch
, allMatch
, noneMatch
,以及 findFirst
, findAny
都属于短路操作。这些操作的共同特点是:一旦在并行处理的某个分支中找到了一个可以决定最终结果的元素,就需要一种机制来尽快地通知所有其他并行的任务“停止工作”,从而避免不必要的计算。AbstractShortCircuitTask
就是为了实现这个“通知并停止”的机制而设计的。
它继承自 AbstractTask
,后者是所有 Stream 并行任务的基类,提供了任务拆分(trySplit
)和子任务创建(makeChild
)等基础功能。
核心设计思想与关键字段
AbstractShortCircuitTask
的设计精髓在于它如何管理和同步并行任务之间的状态。
// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>extends AbstractTask<P_IN, P_OUT, R, K> {/*** The result for this computation; this is shared among all tasks and set* exactly once*/protected final AtomicReference<R> sharedResult;/*** Indicates whether this task has been canceled.* ...*/protected volatile boolean canceled;
// ... existing code ...
protected final AtomicReference<R> sharedResult;
这是整个设计的核心。它是一个被所有相关 Fork/Join 任务(包括根任务和所有子任务)共享的原子引用。- 共享:子任务在创建时会直接引用父任务的
sharedResult
实例,确保大家操作的是同一个对象。 - 原子性:使用
AtomicReference
保证了多线程环境下对结果的写入是线程安全的。 - 作用:它的初始值为
null
。任何一个子任务一旦找到了可以“短路”的最终结果,就会尝试通过compareAndSet(null, result)
将这个结果写入sharedResult
。由于 CAS 操作的原子性,只有第一个成功写入的线程会生效。一旦sharedResult
不再是null
,就标志着整个大任务的最终结果已经产生。
- 共享:子任务在创建时会直接引用父任务的
protected volatile boolean canceled;
这是一个任务级的取消标志。volatile
关键字确保了其在多线程之间的可见性。它主要用于实现有序短路操作,比如findFirst
。
compute()
- 任务执行与短路检查
compute()
方法是 Fork/Join 任务的执行入口。AbstractShortCircuitTask
重写了这个方法,在标准的任务拆分逻辑中加入了短路检查。
// ... existing code ...@Overridepublic void compute() {// ... (任务拆分准备) ...AtomicReference<R> sr = sharedResult;R result;while ((result = sr.get()) == null) { // 核心检查点if (task.taskCanceled()) { // 检查任务是否被取消result = task.getEmptyResult();break;}// ... (标准的任务拆分和执行逻辑) ...// 如果当前任务块足够小,就执行 doLeaf()if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {result = task.doLeaf();break;}// ... (否则,拆分成左右子任务并 fork 一个) ...}task.setLocalResult(result);task.tryComplete();}protected boolean taskCanceled() {boolean cancel = canceled;if (!cancel) {for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())cancel = parent.canceled;}return cancel;}
// ... existing code ...
compute
方法的核心是一个 while
循环,其循环条件 (result = sr.get()) == null
是短路机制的关键。在每次进行任务拆分或执行叶子任务(doLeaf
)之前,它都会检查共享的 sharedResult
。如果发现 sharedResult
已经被其他任务写入了值,循环就会立即终止,当前任务也就不会再进行后续的计算,从而实现了短路。
shortCircuit(R result)
- 设置短路结果
当一个叶子任务(在 doLeaf()
方法中)计算出了一个可以决定全局结果的值时,它会调用这个方法。
// ... existing code ...protected void shortCircuit(R result) {if (result != null)sharedResult.compareAndSet(null, result);}
// ... existing code ...
这个方法非常简单,就是尝试用 CAS 操作将结果写入 sharedResult
。第一个成功的任务会“胜出”,后续所有调用 shortCircuit
的任务都会因为 sharedResult
不再是 null
而失败,这保证了结果只会被设置一次。
其它方法
getEmptyResult()
- 获取默认结果
这是一个抽象方法,需要子类去实现。它定义了在没有发生短路,并且所有元素都处理完毕后,应该返回的默认结果。
- 对于
anyMatch
,如果遍历完都没找到匹配项,默认结果是false
。 - 对于
allMatch
,如果遍历完所有项都匹配,默认结果是true
。 - 对于
findFirst
,如果流为空,默认结果是Optional.empty()
。
cancel()
和 cancelLaterNodes()
- 有序操作的短路
对于像 findFirst
这样的有序操作,仅仅找到一个结果是不够的。我们必须确保找到的是 遭遇顺序(encounter order) 中最靠前的那一个。
cancelLaterNodes()
方法就是为此设计的。当一个任务(比如处理流中 0-100
号元素的任务)找到了一个结果后,它会调用 cancelLaterNodes()
。这个方法会沿着任务树向上回溯,并把所有在它“右边”的兄弟任务以及父节点的右兄弟任务的 canceled
标志都设置为 true
。这些被取消的任务在 compute
循环的 taskCanceled()
检查点就会提前退出。
这确保了只有遭遇顺序最靠前的任务产生的结果才会被接受,所有处理更后面元素的任务都会被取消。
子类如何使用
像我们之前分析的 MatchTask
和 FindTask
都会继承 AbstractShortCircuitTask
。它们需要:
- 提供
getEmptyResult()
的具体实现。 - 实现
doLeaf()
方法。在这个方法里,它们会处理一小块数据,如果发现了短路条件,就调用shortCircuit(result)
。
总结
AbstractShortCircuitTask
是 Java Stream 并行短路操作的基石。它通过一个所有任务共享的 AtomicReference
(sharedResult
) 作为通信和同步的媒介,构建了一个优雅而高效的并行短路框架。
- 核心机制:任务在执行前检查
sharedResult
,如果已有结果则提前退出。 - 结果写入:叶子任务通过
shortCircuit()
方法以原子方式写入第一个有效结果。 - 有序性支持:通过
cancel()
和cancelLaterNodes()
机制,为findFirst
等有序操作提供了额外的取消逻辑,保证了结果的正确性。
这个类的设计充分体现了在并行计算中,如何通过共享状态和原子操作来协调多个并发任务,以最高效率达成共同的目标。