Java Stream核心:ReferencePipeline解析
ReferencePipeline
ReferencePipeline
继承了 AbstractPipeline
,并实现了 Stream
接口。它的核心作用是:
- 承上启下:作为
AbstractPipeline
的直接子类,它实现了父类中所有与“形状”相关的抽象方法,为处理对象引用类型(非int
,long
,double
等基本类型)的流提供了具体的实现。 - 实现
Stream
接口:它实现了Stream
接口中定义的所有中间操作(如map
,filter
,flatMap
)和终端操作(如forEach
,collect
)。
下面我们来具体分析它是如何实现这些特性的。
实现 AbstractPipeline
的抽象方法(“形状”相关)
ReferencePipeline
首先要做的,就是告诉 AbstractPipeline
这个骨架,如何处理“对象引用”这种形状的数据。
// ... existing code ...// Shape-specific methods@Overridefinal StreamShape getOutputShape() {return StreamShape.REFERENCE;}@Overridefinal <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,Spliterator<P_IN> spliterator,boolean flattenTree,IntFunction<P_OUT[]> generator) {return Nodes.collect(helper, spliterator, flattenTree, generator);}@Overridefinal <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,Supplier<Spliterator<P_IN>> supplier,boolean isParallel) {return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);}@Overridefinal Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {return new StreamSpliterators.DelegatingSpliterator<>(supplier);}@Overridefinal boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
// ... existing code ...}@Overridefinal Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {return Nodes.builder(exactSizeIfKnown, generator);}
// ... existing code ...
getOutputShape()
: 明确告诉框架,这个管道流出的是对象引用。evaluateToNode(...)
: 当需要将流的结果收集到一个Node
中时(例如在并行计算中),它委托给Nodes.collect()
这个工具方法来完成。wrap(...)
: 当需要将一个上游Spliterator
和下游操作包装成一个新的Spliterator
时,它创建一个StreamSpliterators.WrappingSpliterator
实例。这个包装器就是实现延迟计算的关键。lazySpliterator(...)
: 当从Supplier
创建流时,它创建一个StreamSpliterators.DelegatingSpliterator
,实现懒加载。makeNodeBuilder(...)
: 当需要构建一个Node
时,它委托给Nodes.builder()
来创建一个适用于对象引用的Node.Builder
。
通过实现这些方法,ReferencePipeline
就具备了处理对象类型流所需的基础设施。
实现 Stream
接口的中间操作
这是 ReferencePipeline
最核心的部分,它为用户提供了丰富的流操作方法。其实现模式高度一致,堪称典范。我们以 filter
和 map
为例:
filter(Predicate)
// ... existing code ...@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}
// ... existing code ...
- 返回新阶段: 它
new
了一个StatelessOp
对象。StatelessOp
是ReferencePipeline
的一个静态内部类,它本身也是一个ReferencePipeline
。构造函数new StatelessOp<>(this, ...)
将当前阶段 (this
) 作为上游,从而将新的filter
操作链接到流水线的末端。 - 传递标志位: 它告诉框架,
filter
操作是一个NOT_SIZED
操作,因为过滤后元素的数量是不确定的。 - 实现
opWrapSink
: 这是魔法发生的地方。它重写了AbstractPipeline
中的核心抽象方法opWrapSink
。- 目的: 这个方法的任务是“包装”下游传来的
Sink
。 - 实现: 它创建了一个
Sink.ChainedReference
。这个新的Sink
在它的accept
方法中执行了filter
的核心逻辑:if (predicate.test(u))
。只有当元素满足条件时,它才会调用下游downstream.accept(u)
,将元素传递下去。 begin
方法: 它调用downstream.begin(-1)
,通知下游数据量未知。
- 目的: 这个方法的任务是“包装”下游传来的
map(Function)
// ... existing code ...@Overridepublic final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));}};}};}
// ... existing code ...
map
的实现与 filter
几乎完全相同,只是:
- 标志位不同:
map
操作可能会破坏原有的排序和独特性,所以它传递了NOT_SORTED | NOT_DISTINCT
。 opWrapSink
的逻辑不同:它的accept
方法执行的是map
的逻辑:downstream.accept(mapper.apply(u))
,即将转换后的新元素传递给下游。
几乎所有的无状态中间操作(filter
, map
, peek
等)都遵循这个模式:
- 创建一个新的
StatelessOp
阶段,并链接到当前流水线。 - 提供正确的操作标志位。
- 在
opWrapSink
中创建一个新的Sink
,该Sink
封装了当前操作的业务逻辑,并负责将处理结果传递给下游的Sink
。
flatMap
flatMap
的实现相对复杂一些,因为它需要处理由 mapper
函数生成的内层 Stream。
ReferencePipeline.java
// ...@Overridepublic final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {Objects.requireNonNull(mapper);// 返回一个新的 StatelessOp,其 opWrapSink 方法会创建一个特殊的 Sinkreturn new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {// ... (内部创建了一个实现了 Sink<P_OUT> 的 FlatMap 类) ...// FlatMap 类的 accept(P_OUT e) 方法会:// 1. 调用 mapper.apply(e) 得到一个 Stream<? extends R> result// 2. 如果 result 不为 null,则遍历 result 中的每个元素 r// 3. 对每个 r 调用下游 sink.accept(r)// 它还需要处理短路 (cancellationRequested)boolean shorts = isShortCircuitingPipeline();final class FlatMap implements Sink<P_OUT>, Predicate<R> {boolean cancel;@Override public void begin(long size) { sink.begin(-1); }@Override public void end() { sink.end(); }@Overridepublic void accept(P_OUT e) {try (Stream<? extends R> result = mapper.apply(e)) { // try-with-resources 确保内部流被关闭if (result != null) {if (shorts) // 如果整个管道是短路的// 使用 allMatch 和 Predicate<R> 来迭代内部流,并允许提前终止result.sequential().allMatch(this);else// 否则,简单地将内部流的每个元素传递给下游 sinkresult.sequential().forEach(sink);}}}@Overridepublic boolean cancellationRequested() {// 如果当前 sink 或下游 sink 请求取消,则返回 truereturn cancel || (cancel |= sink.cancellationRequested());}// 这个 test 方法是为 shorts && result.sequential().allMatch(this) 服务的// allMatch 会在 Predicate.test 返回 false 时停止@Overridepublic boolean test(R output) {if (!cancel) {sink.accept(output); // 将元素传递给下游// 如果下游请求取消,则设置 cancel 标志并返回 false 以停止 allMatchreturn !(cancel |= sink.cancellationRequested());} else {return false; // 如果已经取消,则返回 false}}}return new FlatMap();}};}
// ...
flatMap
的Sink
实现 (FlatMap
内部类) 稍微复杂:- 它的
accept(P_OUT e)
方法接收上游元素e
。 - 它调用
mapper.apply(e)
来获取一个内层Stream<? extends R>
。 - 然后它遍历这个内层
Stream
,并将内层流的每个元素传递给下游的sink
。 - 它还需要正确处理短路逻辑,如果下游
sink
请求取消,内层流的遍历也应该停止。 try-with-resources
用于确保由mapper
返回的流被正确关闭。
- 它的
Stream
接口实现:有状态操作 (Stateful Operations)
对于有状态操作,如 distinct
, sorted
, limit
, skip
,ReferencePipeline
并没有直接实现它们的复杂逻辑。
// 在 Stream.java 中定义
Stream<T> sorted();
Stream<T> distinct();// 在 ReferencePipeline.java 中,它们通常被委托给专门的 Ops 类
// (实际实现是在 Stream.java 的 default 方法或 ReferencePipeline 的父类中委托)
// 例如 sorted() 会最终调用 SortedOps.makeRef(this)
// limit(n) 会最终调用 SliceOps.makeRef(this, 0, n)@Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}@Overridepublic final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);}@Overridepublic final Stream<P_OUT> limit(long maxSize) {if (maxSize < 0)throw new IllegalArgumentException(Long.toString(maxSize));return SliceOps.makeRef(this, 0, maxSize);}@Overridepublic final Stream<P_OUT> skip(long n) {if (n < 0)throw new IllegalArgumentException(Long.toString(n));if (n == 0)return this;elsereturn SliceOps.makeRef(this, n, -1);}@Overridepublic final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {return WhileOps.makeTakeWhileRef(this, predicate);}@Overridepublic final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {return WhileOps.makeDropWhileRef(this, predicate);}@Overridepublic final boolean allMatch(Predicate<? super P_OUT> predicate) {return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));}
有状态操作非常复杂,尤其是在并行环境下。它们需要收集所有(或部分)元素,进行全局处理,然后才能输出。这种操作是并行流中的“屏障”(Barrier)。将这些复杂逻辑封装在专门的 Ops
类(如 SortedOps
, DistinctOps
, SliceOps
)中,可以:
- 逻辑分离:让
ReferencePipeline
保持清晰,只负责流水线的构建和无状态操作的实现。 - 复用:这些
Ops
类可以为不同类型的流(ReferencePipeline
,IntPipeline
等)提供统一的实现策略。 - 集中处理复杂性:并行化、状态管理、缓冲区等所有复杂问题都在
Ops
类中解决。
内部类 Head
, StatelessOp
, StatefulOp
ReferencePipeline
内部定义了这些类来代表不同类型的管道阶段:
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
:- 代表 Stream 管道的源头。
- 它的构造函数接收
Spliterator
或Supplier<Spliterator>
。 - 它覆盖了
opIsStateful()
(返回false
) 和opWrapSink()
(直接将元素从Spliterator
推到Sink
)。
static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
:- 代表无状态的中间操作,如
filter
,map
。 - 它的构造函数接收上游管道和操作标志。
- 它覆盖了
opIsStateful()
(返回false
)。 - 核心在于其子类必须实现
opWrapSink(int flags, Sink<E_OUT> sink)
方法,该方法定义了如何将上游Sink<E_IN>
的元素处理并传递给下游Sink<E_OUT>
。我们上面看到的filter
和map
的实现就是通过匿名内部类继承StatelessOp
并实现opWrapSink
。
- 代表无状态的中间操作,如
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
:- 代表有状态的中间操作,如
distinct
,sorted
,limit
,skip
。 - 它覆盖了
opIsStateful()
(返回true
)。 - 有状态操作通常更复杂,因为它们可能需要看到所有元素才能产生输出,或者需要维护内部状态。
- 它们通常需要覆盖
opEvaluateParallel
和opEvaluateSequential
来提供并行和顺序执行的特定逻辑,因为它们可能无法简单地通过opWrapSink
来实现(例如sorted
需要收集所有元素才能排序)。
- 代表有状态的中间操作,如
实现 Stream
接口的终端操作
ReferencePipeline
本身不直接实现终端操作。终端操作的逻辑被封装在 TerminalOp
接口的实现中(例如 ForEachOp
, ReduceOp
, FindOp
)。
Stream
接口中的终端操作方法(如 forEach
, collect
)会创建相应的 TerminalOp
实例,然后调用 AbstractPipeline
提供的 evaluate()
方法来启动整个流水线的求值过程。
例如,forEach
的实现(ReferencePipeline
中) :
public void forEach(Consumer<? super P_OUT> action) {evaluate(ForEachOps.makeRef(action, false));}
它创建了一个 ForEachOp
,然后调用 evaluate
。evaluate
方法是 AbstractPipeline
的核心驱动引擎,它会负责获取 Spliterator
,构建 Sink
链,然后开始推送数据。
总结
ReferencePipeline
通过以下方式为用户提供了丰富的 Stream
特性:
- 继承骨架:继承
AbstractPipeline
,获得流水线构建、标志位管理、并行/顺序切换和终端操作驱动等通用能力。 - 填充细节:实现
AbstractPipeline
的抽象方法,为处理“对象引用”这种具体“形状”的数据流提供了必要的工具(如Node.Builder
,WrappingSpliterator
)。 - 实现操作:通过一个优雅且统一的模式(创建新阶段 + 实现
opWrapSink
)来实现Stream
接口中定义的各种中间操作。每个操作都返回一个新的ReferencePipeline
实例,从而形成链式调用。 - 定义管道阶段类型: 通过内部类
Head
,StatelessOp
,StatefulOp
来区分源、无状态中间操作和有状态中间操作。 - 委托终端操作:将终端操作的执行委托给
AbstractPipeline
的evaluate
方法,由它来统一驱动整个流水线的计算。
这种设计将流水线控制(AbstractPipeline
)和具体操作逻辑(ReferencePipeline
中的 opWrapSink
实现)完美地分离开来,是其强大、灵活和可扩展的根本原因。
ReferencePipeline.Head
ReferencePipeline.Head
是所有引用类型流(Stream<T>
)的起点。当通过 Stream.of(...)
、collection.stream()
等方法创建一个流时,其内部实现就是创建了一个 Head
类的实例,它代表了整个流式处理管道的源头。
/*** Source stage of a ReferencePipeline.** @param <E_IN> type of elements in the upstream source* @param <E_OUT> type of elements in produced by this stage* @since 1.8*/
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {// ...
}
static class Head<...>
:- 它是一个
static
静态内部类。这意味着Head
类的实例不依赖于外部ReferencePipeline
类的实例。这很合理,因为Head
本身就是管道的第一个环节,它没有外部类实例的概念。 - 它继承自
ReferencePipeline<E_IN, E_OUT>
,表明Head
本身就是一个功能完整的ReferencePipeline
。你可以对一个Head
对象调用map
,filter
等所有Stream
的方法。
- 它是一个
- 角色:
- 如 Javadoc 中所述,
Head
是ReferencePipeline
的 源阶段(Source stage)。它封装了流的原始数据源(例如,一个集合、数组或者生成函数),是所有后续操作(如map
,filter
)的基础。整个流管道就像一个链表,Head
就是这个链表的头节点。
- 如 Javadoc 中所述,
构造方法
Head
有两个构造函数,用于从不同的数据源创建流。
// ... existing code ...static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {/*** Constructor for the source stage of a Stream.** @param source {@code Supplier<Spliterator>} describing the stream* source* @param sourceFlags the source flags for the stream source, described* in {@link StreamOpFlag}*/Head(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}/*** Constructor for the source stage of a Stream.** @param source {@code Spliterator} describing the stream source* @param sourceFlags the source flags for the stream source, described* in {@link StreamOpFlag}*/Head(Spliterator<?> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}
// ... existing code ...
Head(Spliterator<?> source, ...)
: 这个构造函数接受一个Spliterator
(可分割迭代器)作为数据源。Spliterator
是 Java 8 中引入的,是流并行处理的基石。Head(Supplier<? extends Spliterator<?>> source, ...)
: 这个构造函数接受一个Spliterator
的Supplier
(提供者)。这允许延迟创建Spliterator
,直到流的终端操作被调用时才真正获取数据源,这是一种惰性求值的体现。
这两个构造函数都只是简单地调用了父类 AbstractPipeline
的构造函数,将数据源、源标志(sourceFlags
,如 SIZED
, ORDERED
等)和并行标志(parallel
)保存起来。
核心方法分析
Head
类重写了几个关键方法,其中一些是为了“禁止”调用,另一些则是为了“优化”。
禁止调用的方法
// ... existing code ...@Overridefinal boolean opIsStateful() {throw new UnsupportedOperationException();}@Overridefinal Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {throw new UnsupportedOperationException();}
// ... existing code ...
opIsStateful()
和opWrapSink(...)
: 这两个方法是为中间操作(Intermediate operations likemap
,filter
,sorted
)设计的。opIsStateful()
用来标识一个操作是否是有状态的(如sorted
需要看到所有元素才能排序)。opWrapSink(...)
是实现操作链的关键,它将下游操作的Sink
(接收器)包装成一个新的Sink
,以实现当前操作的逻辑。
- 为什么抛出异常? 因为
Head
是数据源,它不是一个“操作”。它没有上游,也不需要包装任何东西。它负责驱动整个流程的开始。因此,调用这两个方法在逻辑上是错误的,直接抛出异常可以防止不正确的内部使用。
优化终端操作
这是 Head
类中最有意思的部分。它为 forEach
和 forEachOrdered
提供了针对顺序流的短路优化。
// ... existing code ...// Optimized sequential terminal operations for the head of the pipeline@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEach(action);}}@Overridepublic void forEachOrdered(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEachOrdered(action);}}
// ... existing code ...
- 优化逻辑:
if (!isParallel())
: 首先检查流是否是并行的。- 如果是顺序流: 它会直接获取底层的
Spliterator
,并调用其forEachRemaining(action)
方法。这是一个巨大的优化。它完全绕过了 Stream 内部复杂的Sink
链机制。对于一个简单的collection.stream().forEach(...)
,它几乎等同于直接在集合上写for-each
循环,开销极小。 - 如果是并行流: 它会调用
super.forEach(action)
,回退到ReferencePipeline
中定义的通用处理逻辑。这个通用逻辑会构建完整的Sink
链,并使用 Fork/Join 框架来并行执行任务。
StatelessOp
和 StatefulOp
它们是 Stream API 设计模式的精髓体现,通过模板方法模式,为两类截然不同的中间操作(无状态和有状态)定义了清晰的、可复用的流程和契约。
首先,StatelessOp
和 StatefulOp
都是:
- 静态内部类:它们属于
ReferencePipeline
,但可以独立于外部类的实例存在。 - 继承
ReferencePipeline
: 这意味着它们自身就是一个Pipeline
阶段,拥有完整的流水线能力。 - 为中间操作设计: 它们的构造函数
(AbstractPipeline<?, E_IN, ?> upstream, ...)
表明,它们必须链接在一个已存在的上游(upstream
)之后,用于延长流水线,而不能作为源头。
StatelessOp<E_IN, E_OUT>
:无状态操作的模板
这个类是为 map
, filter
, peek
, flatMap
等无状态操作设计的模板。
定义的流程与契约:
构造流程:
super(upstream, opFlags)
: 调用父类构造函数,将自己正确地链接到流水线中。assert upstream.getOutputShape() == inputShape
: 这是一个重要的断言,确保上游输出的元素类型(Shape
)和当前操作期望的输入类型一致。
核心契约:
opIsStateful()
@Override final boolean opIsStateful() {return false; }
- 这是它最核心的特征。它明确地、最终地(
final
)声明:“我是一个无状态操作”。 - 这个返回
false
的声明会影响整个流水线的执行策略。当终端操作评估流水线时,看到这个标志,就知道这个阶段的处理非常“轻量”,可以简单地通过Sink
链式包装来懒加载执行,不需要任何特殊的屏障或缓冲。
- 这是它最核心的特征。它明确地、最终地(
未定义的契约:
opWrapSink
StatelessOp
是一个抽象类,但它自身没有定义任何新的抽象方法。它依赖于继承自AbstractPipeline
的抽象方法opWrapSink
。 任何一个具体的无状态操作(比如filter
的实现)在继承StatelessOp
时,必须实现opWrapSink
方法。// 以 filter 为例 return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {// ... 返回一个包装了 filter 逻辑的 Sink} };
这个模式强制具体的无状态操作去定义“如何将自己的操作逻辑包装成一个
Sink
”。
总结 StatelessOp
的流程:它定义了一个“无状态中间操作”的标准化流程——通过构造函数加入流水线,通过 opIsStateful()
声明自己是无状态的,并强制子类通过实现 opWrapSink
来提供具体的处理逻辑。
StatefulOp<E_IN, E_OUT>
:有状态操作的模板
这个类是为 sorted
, distinct
, limit
, skip
等有状态操作设计的模板。这些操作通常需要查看多个甚至所有元素才能产出结果。
定义的流程与契约:
构造流程: 与
StatelessOp
完全相同,也是将自己链接到流水线中。核心契约:
opIsStateful()
@Override final boolean opIsStateful() {return true; }
- 与
StatelessOp
相反,它明确声明:“我是一个有状态操作”。 - 这个
true
的返回值是一个关键信号。当sourceSpliterator
方法在准备并行执行时,看到这个标志,就会知道这里是一个计算屏障 (Barrier)。它会停止简单的Sink
包装,转而调用opEvaluateParallelLazy
来“引爆”这个屏障,提前对数据进行全局处理。
- 与
新的核心契约:
opEvaluateParallel
@Override abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,Spliterator<P_IN> spliterator,IntFunction<E_OUT[]> generator);
- 这是
StatefulOp
引入的新抽象方法【父类Abstract Pipeline实现为抛出一个异常】。它强制所有具体的有状态操作(如SortedOps
,DistinctOps
)必须提供一个并行计算的实现。 - 这个方法要求子类接收上游的
spliterator
,然后通过并行计算,将所有处理结果收集到一个Node
(一个内部数据容器)中并返回。这正是“屏障”操作的本质——消耗上游所有数据,产出一个包含完整结果的内部数据集。
- 这是
总结 StatefulOp
的流程:它定义了一个“有状态中间操作”的标准化流程——通过构造函数加入流水线,通过 opIsStateful()
声明自己是有状态的(从而触发屏障机制),并强制子类通过实现 opEvaluateParallel
来提供具体的、重量级的并行处理和数据收集逻辑。
StatelessOp
和 StatefulOp
这两个“算子”的意义是什么?
既然每个操作都要自己实现,为什么不直接 new ReferencePipeline<...>() { ... }
,而是要多此一举通过 StatelessOp
和 StatefulOp
这两个中间层呢?
这就是模板方法模式的精髓所在,它们的意义是:定义契约、分离职责、简化实现。
可以把 StatelessOp
和 StatefulOp
想象成两种不同类型的 “合同模板”。
1. StatelessOp
(无状态操作合同模板)
它承诺并强制了一件事:
@Override final boolean opIsStateful() {return false; }
任何使用
StatelessOp
模板创建的操作,都自动被打上“无状态”的标签。这对于 Stream 框架进行优化至关重要。框架看到这个false
,就知道这个操作很简单,不需要缓冲,可以一个一个元素地流式处理。它要求实现者做一件事: 它继承了
opWrapSink
这个抽象方法,所以任何new StatelessOp<...>()
的匿名子类都必须提供opWrapSink
的实现。合同内容: “我(
StatelessOp
)帮你声明了你是‘无状态’的,你(匿名子类)只需要告诉我你的具体处理逻辑(实现opWrapSink
)就行了。”
2. StatefulOp
(有状态操作合同模板)
它承诺并强制了另一件事:
@Override final boolean opIsStateful() {return true; }
任何使用
StatefulOp
模板创建的操作,都自动被打上“有状态”的标签。框架看到这个true
,就知道这是个复杂操作,尤其在并行计算时,它是一个**“计算屏障”**,需要特殊处理(比如先把所有元素收集起来)。它要求实现者做另一件更复杂的事: 它引入了一个新的抽象方法
opEvaluateParallel
。abstract <P_IN> Node<E_OUT> opEvaluateParallel(...);
合同内容:“我(
StatefulOp
)帮你声明了你是‘有状态’的,这很复杂。所以你(子类)不能只提供一个简单的opWrapSink
,你必须提供一套完整的并行计算方案(实现opEvaluateParallel
),告诉我如何收集所有数据并产出一个最终结果集(Node
)。”
StatelessOp
和 StatefulOp
的意义在于:
- 分类与抽象:将所有中间操作清晰地分为“无状态”和“有状态”两大类。
- 强制契约:通过
opIsStateful()
方法,强制每个操作都明确自己的“身份”,供框架决策。 - 定义模板:为两类操作提供了不同的实现“模板”。无状态操作只需要实现轻量级的
opWrapSink
,而有状态操作必须实现重量级的opEvaluateParallel
。
这种设计使得 ReferencePipeline
的代码非常干净:map
/filter
等无状态操作的实现模式高度一致;sorted
/distinct
等有状态操作则统一委托给专门的 Ops
类处理。整个架构清晰、健壮且易于扩展。