当前位置: 首页 > news >正文

Java并发编程实战——结构化并发应用程序

文章目录

  • 6 任务执行
    • 6.1 在线程中执行任务
      • 6.1.1 串行地执行任务
      • 6.1.2 显式地为任务创建线程
      • 6.1.3 无限制创建线程的不足
    • 6.2 Executor框架
      • 6.2.1 示例:基于Executor的Web服务器
      • 6.2.2 执行策略
      • 6.2.3 线程池
      • 6.2.4 Executor的生命周期
      • 6.2.5 延迟任务与周期任务
    • 6.3 找出可利用的并行性
      • 6.3.1 示例:串行的页面渲染器
      • 6.3.2 携带结果的任务Callable和Future
      • 6.3.3 示例:使用Future实现页面渲染器
      • 6.3.4 在异构任务并行化中存在的局限
      • 6.3.5 CompletionService:Executor与BlockingQueue
      • 6.3.6 使用CompletionService实现页面渲染器
      • 6.3.7 为任务设置时间
      • 6.3.8 示例:旅行预订门户网站

6 任务执行

6.1 在线程中执行任务

应选择清晰的任务边界以及明确的任务执行策略。

一种自然的任务边界选择方式:以独立的客户请求为边界。

6.1.1 串行地执行任务

6.1.2 显式地为任务创建线程

程序6-1-2-1 在Web服务器中为每一个请求启动一个新的线程(不要这么做)

class ThreadPerTaskWebServer {public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {final Socket  connection = socket.accept();Runnable task = new Runnable() {@Overridepublic void run() {handleRequest(connection);}};new Thread(task).start();}}
}

串行情况下可以提升性能。只要请求的到达速率不超出服务器的请求处理能力。

6.1.3 无限制创建线程的不足

线程生命周期的开销非常高。 线程创建需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助操作。
资源消耗。 活跃线程会消耗系统资源,尤其是内存。大量空闲的线程会占用许多内存,给垃圾收集器带来压力,而且大量线程在竞争CPU资源时还会产生其他的性能开销。如果有足够多的线程使所有的CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。
稳定性。 在一定范围内,增加线程可以提高系统的吞吐率,但是如果超出了这个范围,再创建更多的线程会降低程序的执行速度。如果过多地创建一个线程,那么整个应用程序将崩溃。

6.2 Executor框架

Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程相当于消费者(执行完这些工作单元)。

6.2.1 示例:基于Executor的Web服务器

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;class TaskExecutionWebServer {private static final int NTHREADS = 100;private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {@Overridepublic void run() {handleRequest(connection);}};exec.execute(task);}}
}

6.2.2 执行策略

使用线程池来取代new Thread方式。

6.2.3 线程池

  • newFixedThreadPool。固定长度的线程池。
  • newCachedThreadPool。如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程;而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor。创建单个工作线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。
  • newScheduledThreadPool。固定长度的线程池。以延迟或者定时的方式来执行任务。

6.2.4 Executor的生命周期

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;public class LifeCycleWebServer {private final ExecutorService exec = ...;public void start() throws IOException {ServerSocket socket = new ServerSocket(80);while (!exec.isShutdown()) {try {final Socket conn = socket.accept();exec.execute(new Runnable() {@Overridepublic void run() {handleRequest(conn);}});} catch (RejectedExecutionException e) {if (!exec.isShutdown()) {log("task submission rejected", e);}}}}public void stop() {exec.shutdown();}void handleRequest(Socket connection) {Request req = readRequest(connection);if (isShutdownRequest(req)) {stop();} else {dispatchRequest(req);}}
}

6.2.5 延迟任务与周期任务

推荐使用ScheduleThreadPoolExecutor。

Timer存在的问题:
(1)Timer支持基于绝对时间的调度机制,因此任务的执行对系统时钟变化很敏感。ScheduleThreadPoolExecutor只支持基于相对时间的变化。
(2)Timer执行所有的定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。例如某个周期TimerTask需要每10ms执行一次,而另一个需要40ms,那么这个周期任务或者在40ms之后快速连续调用四次,或者彻底丢失4次调用。
(3)Timer线程并不捕获异常。如果TimerTask抛出异常终止了定时任务,timer不会回复线程执行,而是会错误的认为整个Timer都被取消了。 因此,已被调度但尚未执行的TimerTask将不会再执行,新的任务也不会被调度。

程序清单 6-2-5-1 错误的Timer行为


import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;public class OutOfTime {public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();Timer timer = new Timer();timer.schedule(new ThrowTask(), 1);TimeUnit.SECONDS.sleep(1);long endTime = System.currentTimeMillis();System.out.println(endTime - startTime);timer.schedule(new ThrowTask(), 1);TimeUnit.SECONDS.sleep(5);}static class ThrowTask extends TimerTask {@Overridepublic void run() {throw new RuntimeException();}}
}Exception in thread "Timer-0" java.lang.RuntimeExceptionat executor.OutOfTime$ThrowTask.run(OutOfTime.java:22)at java.base/java.util.TimerThread.mainLoop(Timer.java:566)at java.base/java.util.TimerThread.run(Timer.java:516)
1002
Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled.at java.base/java.util.Timer.sched(Timer.java:409)at java.base/java.util.Timer.schedule(Timer.java:205)at executor.OutOfTime.main(OutOfTime.java:15)

程序一秒钟就结束了,并抛出了异常。

在Java5.0或者更高的JDK中,将很少使用Timer。

6.3 找出可利用的并行性

6.3.1 示例:串行的页面渲染器

import java.util.ArrayList;
import java.util.List;public class SingleThreadRender {void renderPage(CharSequence source) {renderText(source);List<ImageData> imageData = new ArrayList<>();for (ImageInfo imageInfo : scanForImageInfo(source)) {imageData.add(imageInfo.downloadImage());}for (ImageData data : imageData) {renderImage(data);}}
}

6.3.2 携带结果的任务Callable和Future

6.3.3 示例:使用Future实现页面渲染器

程序清单6-3-3-1 使用Future等待图像下载

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;public class FutureRender {private final ExecutorService executor = ...;void renderPage(CharSequence source) {final List<ImageInfo> imageInfos = scanForImageInfo(source);Callable<List<ImageData>> task = new Callable<List<ImageData>>() {@Overridepublic List<ImageData> call() throws Exception {List<ImageData> result = new ArrayList<>();for (ImageInfo imageInfo : imageInfos) {result.add(imageInfo.downloadImage());}return result;}};Future<List<ImageData>> future = executor.submit(task);renderText(source);try {List<ImageData> imageDataList = future.get();for (ImageData imageData : imageDataList) {renderImage(imageData);}} catch (InterruptedException e) {// 重新设置线程的中断状态Thread.currentThread().interrupt();// 由于不需要结果,因此取消任务future.cancel(true);} catch (ExecutionException e) {throw launcherThrowable(e.getCause());}}}

6.3.4 在异构任务并行化中存在的局限

只有大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

6.3.5 CompletionService:Executor与BlockingQueue

6.3.6 使用CompletionService实现页面渲染器

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;public class Render {private final ExecutorService excutor;Render(ExecutorService excutor) {this.excutor = excutor;}void renderPage(CharSequence source) {List<ImageInfo> info = scanForImageInfo(source);CompletionService<ImageData> completionService = new ExecutorCompletionService<>(excutor);for (final ImageInfo imageInfo: info) {completionService.submit(new Callable<ImageData>() {@Overridepublic ImageData call() throws Exception {return imageInfo.downloadImage();}});}renderText(source);try {for (int t = 0, n = info.size(); t < n; t++) {Future<ImageData> f = completionService.take();ImageData imageData = f.get();renderImage(imageData);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (ExecutionException e) {throw launcherThrowable(e.getCause());}}
}

6.3.7 为任务设置时间

    Page renderPageWithAd() throws InterruptedException {long endNanos = System.nanoTime() + END_BUDGET;Future<Ad> f = excutor.submit(new FetchAdTask());// 在等待广告的同时显示页面。Page page = renderPageBody();Ad ad;try {long timeLeft = endNanos - System.nanoTime();ad = f.get(timeLeft, TimeUnit.NANOSECONDS);} catch (ExecutionException e) {ad = DEFAULT_AD;} catch (TimeoutException e) {ad = DEFAULT_AD;f.cancel(true);}page.setAd(ad);return page;}

6.3.8 示例:旅行预订门户网站

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;private class QuoteTask implements Callable<TravelQuote> {private final TravelCompany company;private final TravelInfo travelInfo;@Overridepublic TravelQuote call() throws Exception {return company.solicitQuote(travelInfo);}
}public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {List<QuoteTask> tasks = new ArrayList<>();for (TravelCompany company : companies) {tasks.add(new QuoteTask(company, travelInfo));}List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);List<TravelQuote> quotes = new ArrayList<>(futures.size());Iterator<QuoteTask> iterator = tasks.iterator();for (Future<TravelQuote> f : futures) {QuoteTask task = iterator.next();try {quotes.add(f.get());} catch (ExecutionException e) {quotes.add(task.getFailureQuote(e.getCause()));} catch (CancellationException e) {quotes.add(task.getTimeoutQuote(e.getCause()));}}Collections.sort(quotes, ranking);return quotes;
}
http://www.lryc.cn/news/116314.html

相关文章:

  • uniapp echarts 点击失效
  • 手机开启应急预警通知 / 地震预警
  • 2020年12月 Python(一级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • 遇到无法复现的 Bug
  • 虚拟列表的实现(简单易懂)
  • 【WordPress】如何在WordPress中实现真·页面路由
  • Android界面设计与用户体验
  • 基于 FFmpeg 的跨平台视频播放器简明教程(八):音画同步
  • 【NLP pytorch】基于BiLSTM-CRF模型医疗数据实体识别实战(项目详解)
  • 人工智能原理(1)
  • 预测成真,国内传来三个消息,中国年轻人变了,创新力产品崛起
  • 维深(Wellsenn):2023中国消费端VR内容开发商调研报告(附下载
  • redis事务管理详解
  • 国产低功耗蓝牙HS6621CxC/6621Px系列支持Find My网络功能方案芯片
  • 【openGauss】分区表的介绍与使用
  • 代码随想录算法训练营day57
  • 【基础类】—前后端通信类系统性学习
  • vite项目中使用@代表根路径
  • 冶金化工操作VR虚拟仿真实验软件提高员工们协同作业的配合度
  • SQL Server数据库 -- 索引与视图
  • 2023 java web面试秘籍
  • 2023-08-05力扣今日二题
  • stl_list类(使用+实现)(C++)
  • 利用hfish反控境外攻击源主机
  • 4、Rocketmq之存储原理
  • 在线原型设计工具有好用的吗?就是这10个
  • Vc - Qt - QPainter translate
  • Spark Catalog详解
  • 【Spring专题】手写简易Spring容器过程分析
  • fastadmin自定义键值组件Fieldlist