当前位置: 首页 > article >正文

【Java】ForkJoin 框架

在Java中,ForkJoin框架是并行编程的一个重要工具,它主要用于处理可以分解为多个子任务的复杂任务。ForkJoin框架的核心是ForkJoinPool,它是一个线程池,专门用于执行ForkJoinTask任务。通过将大任务分解为多个小任务,并在多个线程中并行执行这些小任务,ForkJoin框架可以显著提高程序的执行效率。

1.核心组件

ForkJoinPool

• 这是ForkJoin框架的核心线程池,用于管理和调度ForkJoinTask任务。

• 它使用工作窃取算法(work-stealing algorithm),允许空闲的线程从其他线程的任务队列中窃取任务来执行,从而提高线程的利用率。

• 示例代码:

    ForkJoinPool pool = new ForkJoinPool();

ForkJoinTask

• 这是ForkJoin框架中任务的抽象基类,所有自定义的任务都需要继承这个类。

• 通常使用RecursiveTask(有返回值的任务)或RecursiveAction(无返回值的任务)这两个子类来实现具体任务。

• 示例代码:

    public class MyTask extends RecursiveTask<Integer> {private int threshold;private int start;private int end;public MyTask(int threshold, int start, int end) {this.threshold = threshold;this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;if (end - start <= threshold) {for (int i = start; i < end; i++) {sum += i;}} else {int middle = (start + end) / 2;MyTask leftTask = new MyTask(threshold, start, middle);MyTask rightTask = new MyTask(threshold, middle, end);leftTask.fork(); // 异步执行子任务rightTask.fork(); // 异步执行子任务sum = leftTask.join() + rightTask.join(); // 等待子任务完成并获取结果}return sum;}}

2.工作窃取算法

• 工作窃取算法是一种高效的线程调度算法,用于解决线程空闲时的负载均衡问题。

• 每个线程都有自己的双端队列(deque),用于存储任务。

• 当一个线程的任务队列为空时,它可以从其他线程的任务队列中“窃取”任务来执行。

• 这种算法可以有效减少线程的空闲时间,提高线程的利用率。

3.使用示例

• 下面是一个完整的使用ForkJoin框架计算数组和的示例:

  import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class ForkJoinExample {public static void main(String[] args) {int[] array = new int[1000000];for (int i = 0; i < array.length; i++) {array[i] = i;}ForkJoinPool pool = new ForkJoinPool();SumTask task = new SumTask(array, 0, array.length);int sum = pool.invoke(task);System.out.println("Sum: " + sum);}}class SumTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 1000;private int[] array;private int start;private int end;public SumTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Integer compute() {if (end - start <= THRESHOLD) {int sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else {int middle = (start + end) / 2;SumTask leftTask = new SumTask(array, start, middle);SumTask rightTask = new SumTask(array, middle, end);leftTask.fork();rightTask.fork();return leftTask.join() + rightTask.join();}}}

4.优势

• 并行处理:通过将任务分解为多个子任务,并在多个线程中并行执行,可以显著提高程序的执行效率。

• 负载均衡:工作窃取算法可以有效解决线程空闲时的负载均衡问题,提高线程的利用率。

• 易于使用:ForkJoin框架提供了简单易用的API,使得并行任务的实现变得非常方便。

5.注意事项

• 任务分解粒度:任务分解的粒度需要适中。如果任务分解得太细,会导致线程的调度开销过大;如果任务分解得太粗,又无法充分利用多核CPU的优势。

• 线程池大小:合理配置ForkJoinPool的线程池大小,通常建议设置为CPU核心数的两倍左右。

• 线程安全:虽然ForkJoin框架本身是线程安全的,但在任务执行过程中,如果需要访问共享资源,仍然需要注意线程安全问题。

通过合理使用ForkJoin框架,可以有效提高Java程序的并行处理能力,从而提升程序的性能。

public class ForkJoinPoolTest extends RecursiveTask<Long> {private static final int THRESHOLD = 500;long[] array;int start;int end;ForkJoinPoolTest(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}public static void main(String[] args) {long[] arrays = new long[1000];long expecteSum = 0;for (int i = 0; i < arrays.length; i++) {arrays[i] = i;expecteSum = expecteSum + arrays[i];}System.out.println(expecteSum);ForkJoinPoolTest forkJoinPoolTest = new ForkJoinPoolTest(arrays, 0, arrays.length);long startTime = System.currentTimeMillis();Long result = ForkJoinPool.commonPool().invoke(forkJoinPoolTest);long endTime = System.currentTimeMillis();System.out.println("fork/join sum:" + result + ",耗时:" + (endTime - startTime));}@Overrideprotected Long compute() {//如果任务太小if (end - start <= THRESHOLD) {System.out.println("start=" + start + ",end=" + end);long sum = 0;for (int i = start; i < end; i++) {sum = sum + this.array[i];try {TimeUnit.MILLISECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}return sum;}int middle = (end + start) / 2;ForkJoinPoolTest joinPoolTest1 = new ForkJoinPoolTest(this.array, start, middle);ForkJoinPoolTest joinPoolTest2 = new ForkJoinPoolTest(this.array, middle, end);invokeAll(joinPoolTest1, joinPoolTest2);Long result1 = joinPoolTest1.join();Long result2 = joinPoolTest2.join();long result = result1 + result2;System.out.println("result = " + (result1 + result2));return result;}}
http://www.lryc.cn/news/2394080.html

相关文章:

  • PHP实战:安全实现文件上传功能教程
  • 桥 接 模 式
  • 基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
  • 多杆合一驱动城市空间治理智慧化
  • 用QT写一个车速表
  • (19)java在区块链中的应用
  • 数控技术应用理实一体化平台VR实训系统
  • C# 将HTML文档、HTML字符串转换为图片
  • 界面控件DevExpress WinForms v24.2新版亮点:富文本编辑器功能全新升级
  • 华为云Flexus+DeepSeek征文|华为云 Flexus X 加速 Dify 平台落地:高性能、低成本、强可靠性的云上选择
  • Jenkins 2.479.1安装和邮箱配置教程
  • MySQL 大战 PostgreSQL
  • DFS入门刷题c++
  • ToolsSet之:十六进制及二进制编辑运算工具
  • 服务器液冷:突破散热瓶颈,驱动算力革命的“冷静”引擎
  • 1.2 HarmonyOS NEXT分布式架构核心技术解析
  • 【Python训练营打卡】day40 @浙大疏锦行
  • MCP Server的五种主流架构:从原理到实践的深度解析
  • 跨协议协同智造新实践:DeviceNet-EtherCAT网关驱动汽车焊接装配效能跃迁
  • 在Linux上安装Docker并配置镜像加速器:从入门到实战
  • 让 Deepseek 写一个尺码计算器
  • 代码随想录算法训练营第60期第五十三天打卡
  • Nacos实战——动态 IP 黑名单过滤
  • 实验设计与分析(第6版,Montgomery)第5章析因设计引导5.7节思考题5.14 R语言解题
  • 在Ubuntu20.04上安装ROS Noetic
  • python里面导入yfinance的时候报错
  • winform LiveCharts2的使用--图表的使用
  • 【计算机网络】IPv6和NAT网络地址转换
  • flutter简单自定义跟随手指滑动的横向指示器
  • 项目日记 -Qt音乐播放器 -搜索模块