当前位置: 首页 > news >正文

Java Stream核心:ReferencePipeline解析

ReferencePipeline

ReferencePipeline 继承了 AbstractPipeline,并实现了 Stream 接口。它的核心作用是:

  1. 承上启下:作为 AbstractPipeline 的直接子类,它实现了父类中所有与“形状”相关的抽象方法,为处理对象引用类型(非 intlongdouble 等基本类型)的流提供了具体的实现。
  2. 实现 Stream 接口:它实现了 Stream 接口中定义的所有中间操作(如 mapfilterflatMap)和终端操作(如 forEachcollect)。

下面我们来具体分析它是如何实现这些特性的。

实现 AbstractPipeline 的抽象方法(“形状”相关)

ReferencePipeline 首先要做的,就是告诉 AbstractPipeline 这个骨架,如何处理“对象引用”这种形状的数据。

// ... existing code ...// Shape-specific methods@Overridefinal StreamShape getOutputShape() {return StreamShape.REFERENCE;}@Overridefinal <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,Spliterator<P_IN> spliterator,boolean flattenTree,IntFunction<P_OUT[]> generator) {return Nodes.collect(helper, spliterator, flattenTree, generator);}@Overridefinal <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,Supplier<Spliterator<P_IN>> supplier,boolean isParallel) {return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);}@Overridefinal Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {return new StreamSpliterators.DelegatingSpliterator<>(supplier);}@Overridefinal boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
// ... existing code ...}@Overridefinal Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {return Nodes.builder(exactSizeIfKnown, generator);}
// ... existing code ...
  • getOutputShape(): 明确告诉框架,这个管道流出的是对象引用
  • evaluateToNode(...): 当需要将流的结果收集到一个 Node 中时(例如在并行计算中),它委托给 Nodes.collect() 这个工具方法来完成。
  • wrap(...): 当需要将一个上游 Spliterator 和下游操作包装成一个新的 Spliterator 时,它创建一个 StreamSpliterators.WrappingSpliterator 实例。这个包装器就是实现延迟计算的关键。
  • lazySpliterator(...): 当从 Supplier 创建流时,它创建一个 StreamSpliterators.DelegatingSpliterator,实现懒加载。
  • makeNodeBuilder(...): 当需要构建一个 Node 时,它委托给 Nodes.builder() 来创建一个适用于对象引用的 Node.Builder

通过实现这些方法,ReferencePipeline 就具备了处理对象类型流所需的基础设施。

实现 Stream 接口的中间操作

这是 ReferencePipeline 最核心的部分,它为用户提供了丰富的流操作方法。其实现模式高度一致,堪称典范。我们以 filter 和 map 为例:

filter(Predicate)

// ... existing code ...@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}@Overridepublic void accept(P_OUT u) {if (predicate.test(u))downstream.accept(u);}};}};}
// ... existing code ...
  1. 返回新阶段: 它 new 了一个 StatelessOp 对象。StatelessOp 是 ReferencePipeline 的一个静态内部类,它本身也是一个 ReferencePipeline。构造函数 new StatelessOp<>(this, ...) 将当前阶段 (this) 作为上游,从而将新的 filter 操作链接到流水线的末端。
  2. 传递标志位: 它告诉框架,filter 操作是一个 NOT_SIZED 操作,因为过滤后元素的数量是不确定的。
  3. 实现 opWrapSink: 这是魔法发生的地方。它重写了 AbstractPipeline 中的核心抽象方法 opWrapSink
    • 目的: 这个方法的任务是“包装”下游传来的 Sink
    • 实现: 它创建了一个 Sink.ChainedReference。这个新的 Sink 在它的 accept 方法中执行了 filter 的核心逻辑:if (predicate.test(u))。只有当元素满足条件时,它才会调用下游 downstream.accept(u),将元素传递下去。
    • begin 方法: 它调用 downstream.begin(-1),通知下游数据量未知。

map(Function)

// ... existing code ...@Overridepublic final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));}};}};}
// ... existing code ...

map 的实现与 filter 几乎完全相同,只是:

  • 标志位不同:map 操作可能会破坏原有的排序和独特性,所以它传递了 NOT_SORTED | NOT_DISTINCT
  • opWrapSink 的逻辑不同:它的 accept 方法执行的是 map 的逻辑:downstream.accept(mapper.apply(u)),即将转换后的新元素传递给下游。

几乎所有的无状态中间操作(filtermappeek 等)都遵循这个模式:

  1. 创建一个新的 StatelessOp 阶段,并链接到当前流水线。
  2. 提供正确的操作标志位。
  3. 在 opWrapSink 中创建一个新的 Sink,该 Sink 封装了当前操作的业务逻辑,并负责将处理结果传递给下游的 Sink

flatMap

flatMap 的实现相对复杂一些,因为它需要处理由 mapper 函数生成的内层 Stream。

ReferencePipeline.java

// ...@Overridepublic final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {Objects.requireNonNull(mapper);// 返回一个新的 StatelessOp,其 opWrapSink 方法会创建一个特殊的 Sinkreturn new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {// ... (内部创建了一个实现了 Sink<P_OUT> 的 FlatMap 类) ...// FlatMap 类的 accept(P_OUT e) 方法会:// 1. 调用 mapper.apply(e) 得到一个 Stream<? extends R> result// 2. 如果 result 不为 null,则遍历 result 中的每个元素 r// 3. 对每个 r 调用下游 sink.accept(r)// 它还需要处理短路 (cancellationRequested)boolean shorts = isShortCircuitingPipeline();final class FlatMap implements Sink<P_OUT>, Predicate<R> {boolean cancel;@Override public void begin(long size) { sink.begin(-1); }@Override public void end() { sink.end(); }@Overridepublic void accept(P_OUT e) {try (Stream<? extends R> result = mapper.apply(e)) { // try-with-resources 确保内部流被关闭if (result != null) {if (shorts) // 如果整个管道是短路的// 使用 allMatch 和 Predicate<R> 来迭代内部流,并允许提前终止result.sequential().allMatch(this);else// 否则,简单地将内部流的每个元素传递给下游 sinkresult.sequential().forEach(sink);}}}@Overridepublic boolean cancellationRequested() {// 如果当前 sink 或下游 sink 请求取消,则返回 truereturn cancel || (cancel |= sink.cancellationRequested());}// 这个 test 方法是为 shorts && result.sequential().allMatch(this) 服务的// allMatch 会在 Predicate.test 返回 false 时停止@Overridepublic boolean test(R output) {if (!cancel) {sink.accept(output); // 将元素传递给下游// 如果下游请求取消,则设置 cancel 标志并返回 false 以停止 allMatchreturn !(cancel |= sink.cancellationRequested());} else {return false; // 如果已经取消,则返回 false}}}return new FlatMap();}};}
// ...
  • flatMap 的 Sink 实现 (FlatMap 内部类) 稍微复杂:
    • 它的 accept(P_OUT e) 方法接收上游元素 e
    • 它调用 mapper.apply(e) 来获取一个内层 Stream<? extends R>
    • 然后它遍历这个内层 Stream,并将内层流的每个元素传递给下游的 sink
    • 它还需要正确处理短路逻辑,如果下游 sink 请求取消,内层流的遍历也应该停止。
    • try-with-resources 用于确保由 mapper 返回的流被正确关闭。

Stream 接口实现:有状态操作 (Stateful Operations)

对于有状态操作,如 distinctsortedlimitskipReferencePipeline 并没有直接实现它们的复杂逻辑。

// 在 Stream.java 中定义
Stream<T> sorted();
Stream<T> distinct();// 在 ReferencePipeline.java 中,它们通常被委托给专门的 Ops 类
// (实际实现是在 Stream.java 的 default 方法或 ReferencePipeline 的父类中委托)
// 例如 sorted() 会最终调用 SortedOps.makeRef(this)
// limit(n) 会最终调用 SliceOps.makeRef(this, 0, n)@Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}@Overridepublic final Stream<P_OUT> sorted() {return SortedOps.makeRef(this);}@Overridepublic final Stream<P_OUT> limit(long maxSize) {if (maxSize < 0)throw new IllegalArgumentException(Long.toString(maxSize));return SliceOps.makeRef(this, 0, maxSize);}@Overridepublic final Stream<P_OUT> skip(long n) {if (n < 0)throw new IllegalArgumentException(Long.toString(n));if (n == 0)return this;elsereturn SliceOps.makeRef(this, n, -1);}@Overridepublic final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {return WhileOps.makeTakeWhileRef(this, predicate);}@Overridepublic final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {return WhileOps.makeDropWhileRef(this, predicate);}@Overridepublic final boolean allMatch(Predicate<? super P_OUT> predicate) {return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));}

有状态操作非常复杂,尤其是在并行环境下。它们需要收集所有(或部分)元素,进行全局处理,然后才能输出。这种操作是并行流中的“屏障”(Barrier)。将这些复杂逻辑封装在专门的 Ops 类(如 SortedOpsDistinctOpsSliceOps)中,可以:

  • 逻辑分离:让 ReferencePipeline 保持清晰,只负责流水线的构建和无状态操作的实现。
  • 复用:这些 Ops 类可以为不同类型的流(ReferencePipelineIntPipeline 等)提供统一的实现策略。
  • 集中处理复杂性:并行化、状态管理、缓冲区等所有复杂问题都在 Ops 类中解决。

内部类 HeadStatelessOpStatefulOp

ReferencePipeline 内部定义了这些类来代表不同类型的管道阶段:

  • static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>:

    • 代表 Stream 管道的源头。
    • 它的构造函数接收 Spliterator 或 Supplier<Spliterator>
    • 它覆盖了 opIsStateful() (返回 false) 和 opWrapSink() (直接将元素从 Spliterator 推到 Sink)。
  • static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>:

    • 代表无状态的中间操作,如 filtermap
    • 它的构造函数接收上游管道和操作标志。
    • 它覆盖了 opIsStateful() (返回 false)。
    • 核心在于其子类必须实现 opWrapSink(int flags, Sink<E_OUT> sink) 方法,该方法定义了如何将上游 Sink<E_IN> 的元素处理并传递给下游 Sink<E_OUT>。我们上面看到的 filter 和 map 的实现就是通过匿名内部类继承 StatelessOp 并实现 opWrapSink
  • abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>:

    • 代表有状态的中间操作,如 distinctsortedlimitskip
    • 它覆盖了 opIsStateful() (返回 true)。
    • 有状态操作通常更复杂,因为它们可能需要看到所有元素才能产生输出,或者需要维护内部状态。
    • 它们通常需要覆盖 opEvaluateParallel 和 opEvaluateSequential 来提供并行和顺序执行的特定逻辑,因为它们可能无法简单地通过 opWrapSink 来实现(例如 sorted 需要收集所有元素才能排序)。

 实现 Stream 接口的终端操作

ReferencePipeline 本身不直接实现终端操作。终端操作的逻辑被封装在 TerminalOp 接口的实现中(例如 ForEachOpReduceOpFindOp)。

Stream 接口中的终端操作方法(如 forEachcollect)会创建相应的 TerminalOp 实例,然后调用 AbstractPipeline 提供的 evaluate() 方法来启动整个流水线的求值过程。

例如,forEach 的实现(ReferencePipeline 中) :

    public void forEach(Consumer<? super P_OUT> action) {evaluate(ForEachOps.makeRef(action, false));}

它创建了一个 ForEachOp,然后调用 evaluateevaluate 方法是 AbstractPipeline 的核心驱动引擎,它会负责获取 Spliterator,构建 Sink 链,然后开始推送数据。

总结

ReferencePipeline 通过以下方式为用户提供了丰富的 Stream 特性:

  1. 继承骨架:继承 AbstractPipeline,获得流水线构建、标志位管理、并行/顺序切换和终端操作驱动等通用能力。
  2. 填充细节:实现 AbstractPipeline 的抽象方法,为处理“对象引用”这种具体“形状”的数据流提供了必要的工具(如 Node.BuilderWrappingSpliterator)。
  3. 实现操作:通过一个优雅且统一的模式(创建新阶段 + 实现 opWrapSink)来实现 Stream 接口中定义的各种中间操作。每个操作都返回一个新的 ReferencePipeline 实例,从而形成链式调用。
  4. 定义管道阶段类型: 通过内部类 HeadStatelessOpStatefulOp 来区分源、无状态中间操作和有状态中间操作。
  5. 委托终端操作:将终端操作的执行委托给 AbstractPipeline 的 evaluate 方法,由它来统一驱动整个流水线的计算。

这种设计将流水线控制AbstractPipeline)和具体操作逻辑ReferencePipeline 中的 opWrapSink 实现)完美地分离开来,是其强大、灵活和可扩展的根本原因。

ReferencePipeline.Head

ReferencePipeline.Head 是所有引用类型流(Stream<T>)的起点。当通过 Stream.of(...)collection.stream() 等方法创建一个流时,其内部实现就是创建了一个 Head 类的实例,它代表了整个流式处理管道的源头。

/*** Source stage of a ReferencePipeline.** @param <E_IN> type of elements in the upstream source* @param <E_OUT> type of elements in produced by this stage* @since 1.8*/
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {// ...
}
  • static class Head<...>:
    • 它是一个 static 静态内部类。这意味着 Head 类的实例不依赖于外部 ReferencePipeline 类的实例。这很合理,因为 Head 本身就是管道的第一个环节,它没有外部类实例的概念。
    • 它继承自 ReferencePipeline<E_IN, E_OUT>,表明 Head 本身就是一个功能完整的 ReferencePipeline。你可以对一个 Head 对象调用 mapfilter 等所有 Stream 的方法。
  • 角色:
    • 如 Javadoc 中所述,Head 是 ReferencePipeline 的 源阶段(Source stage)。它封装了流的原始数据源(例如,一个集合、数组或者生成函数),是所有后续操作(如 mapfilter)的基础。整个流管道就像一个链表,Head 就是这个链表的头节点。

构造方法

Head 有两个构造函数,用于从不同的数据源创建流。

// ... existing code ...static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {/*** Constructor for the source stage of a Stream.** @param source {@code Supplier<Spliterator>} describing the stream*               source* @param sourceFlags the source flags for the stream source, described*                    in {@link StreamOpFlag}*/Head(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}/*** Constructor for the source stage of a Stream.** @param source {@code Spliterator} describing the stream source* @param sourceFlags the source flags for the stream source, described*                    in {@link StreamOpFlag}*/Head(Spliterator<?> source,int sourceFlags, boolean parallel) {super(source, sourceFlags, parallel);}
// ... existing code ...
  • Head(Spliterator<?> source, ...): 这个构造函数接受一个 Spliterator(可分割迭代器)作为数据源。Spliterator 是 Java 8 中引入的,是流并行处理的基石。
  • Head(Supplier<? extends Spliterator<?>> source, ...): 这个构造函数接受一个 Spliterator 的 Supplier(提供者)。这允许延迟创建 Spliterator,直到流的终端操作被调用时才真正获取数据源,这是一种惰性求值的体现。

这两个构造函数都只是简单地调用了父类 AbstractPipeline 的构造函数,将数据源、源标志(sourceFlags,如 SIZEDORDERED 等)和并行标志(parallel)保存起来。

核心方法分析

Head 类重写了几个关键方法,其中一些是为了“禁止”调用,另一些则是为了“优化”。

禁止调用的方法

// ... existing code ...@Overridefinal boolean opIsStateful() {throw new UnsupportedOperationException();}@Overridefinal Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {throw new UnsupportedOperationException();}
// ... existing code ...
  • opIsStateful() 和 opWrapSink(...): 这两个方法是为中间操作(Intermediate operations like mapfiltersorted)设计的。
    • opIsStateful() 用来标识一个操作是否是有状态的(如 sorted 需要看到所有元素才能排序)。
    • opWrapSink(...) 是实现操作链的关键,它将下游操作的 Sink(接收器)包装成一个新的 Sink,以实现当前操作的逻辑。
  • 为什么抛出异常? 因为 Head 是数据源,它不是一个“操作”。它没有上游,也不需要包装任何东西。它负责驱动整个流程的开始。因此,调用这两个方法在逻辑上是错误的,直接抛出异常可以防止不正确的内部使用。

优化终端操作

这是 Head 类中最有意思的部分。它为 forEach 和 forEachOrdered 提供了针对顺序流的短路优化。

// ... existing code ...// Optimized sequential terminal operations for the head of the pipeline@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEach(action);}}@Overridepublic void forEachOrdered(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {super.forEachOrdered(action);}}
// ... existing code ...
  • 优化逻辑:
    1. if (!isParallel()): 首先检查流是否是并行的。
    2. 如果是顺序流: 它会直接获取底层的 Spliterator,并调用其 forEachRemaining(action) 方法。这是一个巨大的优化。它完全绕过了 Stream 内部复杂的 Sink 链机制。对于一个简单的 collection.stream().forEach(...),它几乎等同于直接在集合上写 for-each 循环,开销极小。
    3. 如果是并行流: 它会调用 super.forEach(action),回退到 ReferencePipeline 中定义的通用处理逻辑。这个通用逻辑会构建完整的 Sink 链,并使用 Fork/Join 框架来并行执行任务。

StatelessOp 和 StatefulOp

它们是 Stream API 设计模式的精髓体现,通过模板方法模式,为两类截然不同的中间操作(无状态和有状态)定义了清晰的、可复用的流程和契约。

首先,StatelessOp 和 StatefulOp 都是:

  1. 静态内部类:它们属于 ReferencePipeline,但可以独立于外部类的实例存在。
  2. 继承 ReferencePipeline: 这意味着它们自身就是一个 Pipeline 阶段,拥有完整的流水线能力。
  3. 为中间操作设计: 它们的构造函数 (AbstractPipeline<?, E_IN, ?> upstream, ...) 表明,它们必须链接在一个已存在的上游(upstream)之后,用于延长流水线,而不能作为源头。

StatelessOp<E_IN, E_OUT>:无状态操作的模板

这个类是为 mapfilterpeekflatMap 等无状态操作设计的模板。

定义的流程与契约:

  1. 构造流程:

    • super(upstream, opFlags): 调用父类构造函数,将自己正确地链接到流水线中。
    • assert upstream.getOutputShape() == inputShape: 这是一个重要的断言,确保上游输出的元素类型(Shape)和当前操作期望的输入类型一致。
  2. 核心契约:opIsStateful()

    @Override
    final boolean opIsStateful() {return false;
    }
    
    • 这是它最核心的特征。它明确地、最终地(final)声明:“我是一个无状态操作”
    • 这个返回 false 的声明会影响整个流水线的执行策略。当终端操作评估流水线时,看到这个标志,就知道这个阶段的处理非常“轻量”,可以简单地通过 Sink 链式包装来懒加载执行,不需要任何特殊的屏障或缓冲。
  3. 未定义的契约:opWrapSink StatelessOp 是一个抽象类,但它自身没有定义任何新的抽象方法。它依赖于继承自 AbstractPipeline 的抽象方法 opWrapSink。 任何一个具体的无状态操作(比如 filter 的实现)在继承 StatelessOp 时,必须实现 opWrapSink 方法。

    // 以 filter 为例
    return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {// ... 返回一个包装了 filter 逻辑的 Sink}
    };
    

    这个模式强制具体的无状态操作去定义“如何将自己的操作逻辑包装成一个 Sink”。

总结 StatelessOp 的流程:它定义了一个“无状态中间操作”的标准化流程——通过构造函数加入流水线,通过 opIsStateful() 声明自己是无状态的,并强制子类通过实现 opWrapSink 来提供具体的处理逻辑。


StatefulOp<E_IN, E_OUT>:有状态操作的模板

这个类是为 sorteddistinctlimitskip 等有状态操作设计的模板。这些操作通常需要查看多个甚至所有元素才能产出结果。

定义的流程与契约:

  1. 构造流程: 与 StatelessOp 完全相同,也是将自己链接到流水线中。

  2. 核心契约:opIsStateful()

    @Override
    final boolean opIsStateful() {return true;
    }
    
    • 与 StatelessOp 相反,它明确声明:“我是一个有状态操作”
    • 这个 true 的返回值是一个关键信号。当 sourceSpliterator 方法在准备并行执行时,看到这个标志,就会知道这里是一个计算屏障 (Barrier)。它会停止简单的 Sink 包装,转而调用 opEvaluateParallelLazy 来“引爆”这个屏障,提前对数据进行全局处理。
  3. 新的核心契约:opEvaluateParallel

    @Override
    abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,Spliterator<P_IN> spliterator,IntFunction<E_OUT[]> generator);
    
    • 这是 StatefulOp 引入的新抽象方法【父类Abstract Pipeline实现为抛出一个异常】。它强制所有具体的有状态操作(如 SortedOpsDistinctOps)必须提供一个并行计算的实现
    • 这个方法要求子类接收上游的 spliterator,然后通过并行计算,将所有处理结果收集到一个 Node(一个内部数据容器)中并返回。这正是“屏障”操作的本质——消耗上游所有数据,产出一个包含完整结果的内部数据集。

总结 StatefulOp 的流程:它定义了一个“有状态中间操作”的标准化流程——通过构造函数加入流水线,通过 opIsStateful() 声明自己是有状态的(从而触发屏障机制),并强制子类通过实现 opEvaluateParallel 来提供具体的、重量级的并行处理和数据收集逻辑。

StatelessOp 和 StatefulOp 这两个“算子”的意义是什么?

既然每个操作都要自己实现,为什么不直接 new ReferencePipeline<...>() { ... },而是要多此一举通过 StatelessOp 和 StatefulOp 这两个中间层呢?

这就是模板方法模式的精髓所在,它们的意义是:定义契约、分离职责、简化实现。

可以把 StatelessOp 和 StatefulOp 想象成两种不同类型的 “合同模板”。

1. StatelessOp (无状态操作合同模板)

  • 它承诺并强制了一件事

    @Override
    final boolean opIsStateful() {return false;
    }
    

    任何使用 StatelessOp 模板创建的操作,都自动被打上“无状态”的标签。这对于 Stream 框架进行优化至关重要。框架看到这个 false,就知道这个操作很简单,不需要缓冲,可以一个一个元素地流式处理。

  • 它要求实现者做一件事: 它继承了 opWrapSink 这个抽象方法,所以任何 new StatelessOp<...>() 的匿名子类都必须提供 opWrapSink 的实现。

    合同内容: “我(StatelessOp)帮你声明了你是‘无状态’的,你(匿名子类)只需要告诉我你的具体处理逻辑(实现 opWrapSink)就行了。”

2. StatefulOp (有状态操作合同模板)

  • 它承诺并强制了另一件事

    @Override
    final boolean opIsStateful() {return true;
    }
    

    任何使用 StatefulOp 模板创建的操作,都自动被打上“有状态”的标签。框架看到这个 true,就知道这是个复杂操作,尤其在并行计算时,它是一个**“计算屏障”**,需要特殊处理(比如先把所有元素收集起来)。

  • 它要求实现者做另一件更复杂的事: 它引入了一个新的抽象方法 opEvaluateParallel

    abstract <P_IN> Node<E_OUT> opEvaluateParallel(...);
    

    合同内容:“我(StatefulOp)帮你声明了你是‘有状态’的,这很复杂。所以你(子类)不能只提供一个简单的 opWrapSink,你必须提供一套完整的并行计算方案(实现 opEvaluateParallel),告诉我如何收集所有数据并产出一个最终结果集(Node)。”

StatelessOp 和 StatefulOp 的意义在于:

  1. 分类与抽象:将所有中间操作清晰地分为“无状态”和“有状态”两大类。
  2. 强制契约:通过 opIsStateful() 方法,强制每个操作都明确自己的“身份”,供框架决策。
  3. 定义模板:为两类操作提供了不同的实现“模板”。无状态操作只需要实现轻量级的 opWrapSink,而有状态操作必须实现重量级的 opEvaluateParallel

这种设计使得 ReferencePipeline 的代码非常干净:map/filter 等无状态操作的实现模式高度一致;sorted/distinct 等有状态操作则统一委托给专门的 Ops 类处理。整个架构清晰、健壮且易于扩展。

http://www.lryc.cn/news/604736.html

相关文章:

  • 【WPS】邮件合并教程\Excel批量写入数据进Word模板
  • 滚珠导轨在电子制造中的流畅性优势
  • 新零售“实—虚—合”逻辑下的技术赋能与模式革新:基于开源AI大模型、AI智能名片与S2B2C商城小程序源码的研究
  • 洛谷 P11230:[CSP-J 2024 T4] 接龙 ← 图论+动态规划
  • 北京-4年功能测试2年空窗-报培训班学测开-第六十四天-准备面试项目(焦虑)-同学开始面试
  • 汽车免拆诊断案例 | 免拆诊断发动机起动困难故障2例
  • Linux730 tr:-d /-s;sort:-r,-n,-R,-o,-t,-k,-u;bash;cut:-d,-c;tee -a;uniq -c -i
  • VS Code中如何关闭Github Copilot
  • 深度学习-丢弃法 Dropout
  • MySQL索引和事务笔记
  • 开源 Arkts 鸿蒙应用 开发(十三)音频--MP3播放
  • WPFC#超市管理系统(3)商品管理
  • 【科研绘图系列】R语言绘制绝对量柱状堆积图+环形图数量统计+特数量标注
  • 潇洒郎: Kafka Ubuntu 安装部署,命令行或者python生产数据与消费数据(kafka-python)
  • 【选型】HK32L088 与 STM32F0/L0 系列 MCU 参数对比与选型建议(ST 原厂 vs 国产芯片)
  • 2025年6月数据挖掘顶刊TKDE研究热点有哪些?
  • 传输层协议UDP与TCP
  • 周期滤波策略
  • AbMole小课堂丨bFGF(FGF-2):维持干细胞培养、驱动类器官构建与细胞分化
  • 容器与虚拟机的本质差异:从资源隔离到网络存储机制
  • 如何使用 Apache Ignite 作为 Spring 框架的缓存(Spring Cache)后端
  • GitPython02-Git使用方式
  • RHCA - CL260 | Day03:配置 RHCS 集群
  • 将 qt 构建为静态库
  • BGP高级特性之正则表达式
  • vue npm install卡住没反应
  • ISO 26262 汽车功能安全(腾讯混元)
  • 在 CentOS 系统上安装 Docker
  • Kotlin -> Kotlin Lambda 表达式与 Function 接口的关系
  • 深入理解 Kotlin Flow:异步数据流处理的艺术