当前位置: 首页 > news >正文

Java stream 并发问题

在并行流中,可能各个线程处理的是同一个sink输出,导致并发问题。

forEach (无序并行):将同步责任交给用户

对于 stream.parallel().forEach(action)多个线程确实会并发地调用同一个 Sink 实例的 accept 方法。

我们来看 ForEachOp 的实现:

// ... existing code .../** Implementation class for reference streams */static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}}
// ... existing code ...

可以看到,accept(T t) 方法直接调用了用户传入的 consumer.accept(t)。它自身没有任何同步措施

这就是 Stream 框架的“契约”forEach 是一个非常底层的操作,它追求极致的性能,因此把线程安全的责任完全交给了开发者。如果你在 forEach 的 Consumer 中操作一个共享的可变对象(比如往一个普通的 ArrayList 中添加元素),你必须自己处理同步,否则就会出现竞态条件、数据丢失或抛出 ConcurrentModificationException 等问题。

正确(但通常不推荐)的做法是:

List<String> sharedList = Collections.synchronizedList(new ArrayList<>());
stream.parallel().forEach(sharedList::add);

collect:框架负责线程安全(推荐方式)

这才是并行收集数据的正确且高效的方式。collect 操作远比 forEach 聪明,它专门设计用来解决并发问题。

collect 操作需要三个函数:

  1. Supplier (供应器)() -> new ArrayList<>()
  2. Accumulator (累加器)(list, item) -> list.add(item)
  3. Combiner (组合器)(list1, list2) -> { list1.addAll(list2); return list1; }

在并行执行时,collect 的工作流程如下:

  1. 分裂ForkJoinPool 将任务分裂给多个线程。
  2. 供应每个线程都会调用 Supplier 来创建自己私有的、局部的结果容器。例如,线程A得到 listA,线程B得到 listB。它们操作的不是同一个 ArrayList
  3. 累加:每个线程使用 Accumulator 将自己负责的元素累加到各自的局部容器中。线程A往 listA 里加,线程B往 listB 里加。因为操作的是线程私有对象,所以完全没有并发问题,速度极快。
  4. 组合:当所有线程都完成了自己的部分后,框架会使用 Combiner 将所有线程的局部结果合并成一个最终结果。例如,执行 listA.addAll(listB)。这个合并过程可能是串行的,也可能是分层并行的。

通过这种“分头累加,最后合并”的策略,collect 完美地避开了在核心并行阶段操作共享可变状态的问题,从而既保证了线程安全,又实现了高并发。

forEachOrdered (有序并行):通过缓冲和串行消费来保证安全

forEachOrdered 为了保证顺序,会先让各个并行任务处理数据并缓冲在各自的 Node 对象里。

最终,当轮到某个任务消费它的结果时,它是在一个确定的“happens-before”关系链中被触发的。这意味着对用户 action 的调用,实际上是串行化的,一个任务消费完了才会轮到下一个。因此,它也从根本上避免了对同一个 Sink 的并发写入问题。

总结

  • forEach:最快、最底层,但不安全。它把同步的烂摊子留给了你。如果你想并行收集到集合里,几乎永远都不应该用它。
  • collect并行收集的正确姿势。通过“本地容器+最终合并”的策略,由框架优雅地解决了并发问题,既安全又高效。
  • forEachOrdered:为了保证顺序,其最终消费阶段是串行化的,因此也是线程安全的。

http://www.lryc.cn/news/604509.html

相关文章:

  • Redis:缓存雪崩、穿透、击穿的技术解析和实战方案
  • 矩阵指数函数 e^A
  • 如何利用 Redis 的原子操作(INCR, DECR)实现分布式计数器?
  • 微算法科技MLGO突破性的监督量子分类器:纠缠辅助训练算法为量子机器学习开辟新天地
  • 代码随想录算法训练营第五十五天|图论part5
  • Python设计模式详解:策略模式(Strategy Pattern)实战指南
  • OpenBayes 教程上新丨仅激活 3B 参数可媲美 GPT-4o,Qwen3 深夜更新,一手实测来了!
  • 代码随想录day50图论1
  • Apache Ignite 与 Spring Boot 集成
  • 学习游戏制作记录(冻结敌人时间与黑洞技能)7.30
  • nginx安装配置Lua模块的支持
  • 我的世界模组开发教程——资源(1)
  • 【AI】入门级提示词模板:适用于ChatGPT、文心一言等主流模型
  • 【ee类保研面试】数学类---线性代数
  • 从0开始学习R语言--Day62--RE插补
  • JVM对象创建与内存分配机制深度剖析
  • 基于Catboost的铁路交通数据分析及列车延误预测系统的设计与实现【全国城市可选、欠采样技术】
  • 【JVM篇11】:分代回收与GC回收范围的分类详解
  • 数据分析师进阶——95页零售相关数据分析【附全文阅读】
  • JVM 性能调优实战:让系统性能 “飞” 起来的核心策略
  • 观远 ChatBI 完成 DeepSeek-R1 大模型适配:开启智能数据分析跃升新篇
  • 【Spring】一文了解SpringMVC的核心功能及工作流程,以及核心组件及注解
  • Linux 日志管理与时钟同步详解
  • GIS工程师面试题
  • GitHub 热门项目 PandaWiki:零门槛搭建智能漏洞库,支持 10 + 大模型接入
  • UG NX二次开发(Python)-根据封闭曲线创建拉伸特征
  • Class27GoogLeNet
  • 实用性方案:高效处理图片拼接的正确打开方式
  • sed编程入门
  • [Agent开发平台] Coze Loop开源 | 前端 | typescript架构API速查