当前位置: 首页 > news >正文

【JUC】线程池ThreadPoolTaskExecutor与面试题解读

1、ThreadPoolTaskExecutor 创建线程池

从它的创建和使用说起,创建和使用的代码如下:
创建:

   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix("ExecutorPool-");executor.initialize();return executor;

使用:

 threadPoolTaskExecutor.execute(() -> {//do something...});

上面创建线程池使用的是ThreadPoolTaskExecutor ,点击进入executor.initialize()方法:
ExecutorConfigurationSupport.java

	/*** Set up the ExecutorService.*/public void initialize() {if (logger.isInfoEnabled()) {logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));}if (!this.threadNamePrefixSet && this.beanName != null) {setThreadNamePrefix(this.beanName + "-");}this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);}

再进入initializeExecutor方法:
ThreadPoolTaskExecutor.java

	/*** Note: This method exposes an {@link ExecutorService} to its base class* but stores the actual {@link ThreadPoolExecutor} handle internally.* Do not override this method for replacing the executor, rather just for* decorating its {@code ExecutorService} handle or storing custom state.*/@Overrideprotected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);ThreadPoolExecutor executor;if (this.taskDecorator != null) {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler) {@Overridepublic void execute(Runnable command) {Runnable decorated = taskDecorator.decorate(command);if (decorated != command) {decoratedTaskMap.put(decorated, command);}super.execute(decorated);}};}else {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler);}if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;}
ThreadPoolExecutor  executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveSeconds,TimeUnit.SECONDS,queue, threadFactory,rejectedExecutionHandler);

可以看到new ThreadPoolExecutor(),ThreadPoolTaskExecutor 本质是使用ThreadPoolExecutor,那为何要存在ThreadPoolTaskExecutor?
ThreadPoolTaskExecutor的方法基本都是围绕如何创建ThreadPoolExecutor,安全有效设置各种参数,添一些行为等。
在这里插入图片描述
如下面设置MaxPoolSize(最大线程池数),它考虑到并发同步、threadPoolExecutor 是否为null的问题。
ThreadPoolTaskExecutor.java

/*** Set the ThreadPoolExecutor's maximum pool size.* Default is {@code Integer.MAX_VALUE}.* <p><b>This setting can be modified at runtime, for example through JMX.</b>*/
public void setMaxPoolSize(int maxPoolSize) {synchronized (this.poolSizeMonitor) {this.maxPoolSize = maxPoolSize;if (this.threadPoolExecutor != null) {this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);}}
}

再如:
设置队列时,它会创建一个安全的队列,有合适大小的LinkedBlockingQueue或者没有大小的SynchronousQueue。从而避免使用newSingleThreadExecutor这类创建一个不安全的等待队列。
ThreadPoolTaskExecutor.java

/*** Create the BlockingQueue to use for the ThreadPoolExecutor.* <p>A LinkedBlockingQueue instance will be created for a positive* capacity value; a SynchronousQueue else.* @param queueCapacity the specified queue capacity* @return the BlockingQueue instance* @see java.util.concurrent.LinkedBlockingQueue* @see java.util.concurrent.SynchronousQueue*/
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<>(queueCapacity);}else {return new SynchronousQueue<>();}
}

newSingleThreadExecutor方式创建的等待队列中的LinkedBlockingQueue容量是Integer.MAX_VALUE
Executors.java

 public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

LinkedBlockingQueue.java

 public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}

ThreadPoolTaskExecutor的父类ExecutorConfigurationSupport的方法可以设置BeanName、ThreadNamePrefix等。
在这里插入图片描述
总结来说在spring项目中使用ThreadPoolTaskExecutor创建线程池是首推使用的。

2、ThreadPoolExecutor解读

2.1 基本使用介绍

ThreadPoolExecutor  executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveSeconds,TimeUnit.SECONDS,queue, threadFactory,rejectedExecutionHandler);
  • corePoolSize:线程池的核心线程数,定义了最小可以同时运行的线程数量。

  • maximumPoolSize:线程池的最大线程数。队列中存放的任务达到队列容量时,可以同时运行的线程数量变为最大线程数。

  • keepAliveTime:当线程池中的线程数量大于corePoolSize时,如果没有新任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了KeepAliveTime才会被回收销毁。

  • unit:keepAliveTime参数的时间单位,包括DAYS、HOURS、MINUTES、MILLISECONDS等。

  • workQueue:用于保存等待执行任务的阻塞队列。可以选择以下集个阻塞队列:

    • ArrayBlockingQueue:是一个基于数组结构的阻塞队列,此队列按FIFO原则对元素进行排序;
    • LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列;
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列;
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列;
  • threadFactory:用于设置创建线程的工厂,可以通过工厂给每个创造出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字:new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();7)handler:饱和策略。若当前同时运行的线程数量达到最大线程数量并且队列已经被放满,ThreadPoolExecutor定义了一些饱和策略:

    • ThreadPoolExecutor.AbortPolicy:直接抛出RejectedExecutionException异常来拒绝处理新任务;
    • ThreadPoolExecutor.CallerRunsPolicy:只用调用者所在的线程来运行任务,会降低新任务的提交速度,影响程序的整体性能;
    • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉;
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中最近的一个任务,执行当前任务;

    处理流程:

在这里插入图片描述
1.在创建了线程池之后,等待提交过来的任务请求
2.当调用execute()方法添加一个请求任务的时候,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个程序
2.2 如果正在运行的线程数量大于或者等于corePoolSize,那么将这个任务放入队列
2.3 如果这个时候队列满了并且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻执行这个任务
2.4 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
3. 当一个线程完成任务的时候,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定时间(keepAliveTime)时:线程会判断如果当前运行的线程数大于corePoolSize,那么这个线程就会被停掉。
最经典的比喻是银行办理业务,可以很形象的去理解线程池七个核心参数之间的关系:

在这里插入图片描述
银行刚开始上班,然后又一位需要办理业务的顾客1进来,柜员1就来一个帮这位顾客1办理,其他在吃早餐摸鱼。这位顾客1办理完了业务就走了,柜员1坐在柜台上静等。

这时又有一位顾客2过来,这时不是柜员1去接待,而是叫上柜员2,因为这天安排上班的两位柜员(corePoolSize=2)。

顾客3来办理业务,因为上班的柜员已经全部就位了,柜员1有空,柜员1就去接待顾客3。

顾客4来办理业务,因为柜员1和柜员2都在忙,所以顾客4在凳子上静等。柜员2帮顾客2办理完了,就去给顾客4办理。这时凳子又空出3张(Queue容量为3)。

顾客5、顾客6、顾客7同时过来,柜员1和柜员2都在忙,顾客5、顾客6、顾客7就坐满了凳子。

过了一会,顾客8来了,柜员1和柜员2在忙同时没有凳子坐了,本着顾客就是上帝的原则,打电话叫来一个在休息的柜员3帮顾客8办理业务。

生意兴隆,顾客9来了,打电话叫来一个在休息的柜员4帮顾客9办理业务。

这是顾客10来了,一进门,柜员没有了,连休息中的也没有了(maximumPoolSize=4),银行直接赶走他。

过了一段时间,银行的客户都办完业务走了,那两位本来应该休息的柜员说:我们再等一下(keepAliveTime),不忙了我们就撤了。

看上面的场景中,搬救兵请休息中的柜员回来干活是很麻烦的,要先坐满凳子再说。线程池中也是,要等等待队列满了以后才会去创建核心线程数之外的线程,因为创建线程的花销是很大的。

http://www.lryc.cn/news/127667.html

相关文章:

  • 也许你正处于《孤注一掷》中的“团队”,要留心了
  • Kafka 入门到起飞 - 什么是 HW 和 LEO?何时更新HW和LEO呢?
  • go入门实践五-实现一个https服务
  • 面试之快速学习STL-set
  • leetcode 1614.括号的最大嵌套深度
  • Ajax 笔记(四)—— Ajax 进阶
  • Linux 5种网络IO模型
  • Linux多线程【初识线程】
  • Python爬虫的应用场景与技术难点:如何提高数据抓取的效率与准确性
  • Spring Cloud Gateway系例—参数配置(CORS 配置、SSL、元数据)
  • QT:UI控件(按设计师界面导航界面排序)
  • AtCoder Beginner Contest 314-A/B/C
  • 讯飞星火、文心一言和通义千问同时编“贪吃蛇”游戏,谁会胜出?
  • 数学建模之“聚类分析”原理详解
  • 【面试问题】当前系统查询接口需要去另外2个系统库中实时查询返回结果拼接优化思路
  • Scada和lloT有什么区别?
  • Conda(Python管理工具)
  • (14)嵌套列表,Xpath路径表达式,XML增删查改,Implicit,Operator,Xml序列化,浅拷贝与深拷贝
  • 软考笔记 信息管理师 高级
  • 124、SpringMVC处理一个请求的流程是怎样的?
  • 低成本高收益,五金店小程序的秘密武器
  • C语言宏定义详解
  • SwiftUI 动画进阶:实现行星绕圆周轨道运动
  • 物理试题-空气净化器
  • Es、kibana安装教程-ES(二)
  • leetcode 917.仅仅反转字母
  • 有没有推荐的golang的练手项目?
  • springBoot的日志文件
  • Linux学习之iptables的nat表
  • 【数据结构】 ArrayList简介与实战