JAVA中关于多线程的学习和使用
多线程基础
1 概述
现代操作系统(Windows
,macOS
,Linux
)都可以执行多任务。多任务就是同时运行多个任务。例如:播放音乐的同时,浏览器可以进行文件下载,同时可以进行QQ消息的收发。
CPU
执行代码都是一条一条顺序执行的,但是,即使是单核CPU
,也可以同时运行多个任务。因为操作系统执行多任务实际上就是让CPU
对多个任务轮流交替执行。
操作系统轮流让多个任务交替执行,例如,让浏览器执行0.001
秒,让QQ执行0.001
秒,再让音乐播放器执行0.001
秒。在用户使用的体验看来,CPU
就是在同时执行多个任务。
1 进程与线程
程序:程序是含有指令和数据的文件,被存储在磁盘或其他的数据存储设备中,可以理解为程序是包含静态代码的文件。例如:浏览器软件、音乐播放器软件等软件的安装目录和文件。
进程:进程是程序的一次执行过程,是系统运行程序的基本单位。
线程:某些进程内部还需要同时执行多个子任务。例如,我们在使用WPS
时,WPS
可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行自动保存和上传云文档,我们把子任务称为线程。线程是进程划分成的更小的运行单位。
进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个主线程。
2 线程基本概念
单线程:单线程就是进程中只有一个线程。单线程在程序执行时,所走的程序路径按照连续顺序排下来,前面的必须处理好,后面的才会执行
多线程:由一个以上的线程组成的程序称为多线程程序。Java中,一定是从主线程开始执行(main方法),然后在主线程的某个位置创建并启动新的线程。
多线程的应用场景
- 软件中的耗时操作,拷贝的迁移文件,加载大量资源的时候
- 所有的后台服务器
- 所有的聊天软件
3 线程的创建方式
3. 1 方式一:继承 java.lang.Thread
类(线程子类)
public class Demo01 {public static void main(String[] args) {//1.创建一个Thread的子类,重写run方法//2.创建子类对象//3.调用start方法启动MyThread myThread = new MyThread();myThread.start();for (int i = 0; i < 10; i++) {System.out.println("主线程执行任务:"+i);}}
}class MyThread extends Thread {@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println(getName()+"正在执行任务:"+i);}}
}
3. 2 方式二:实现 java.lang.Runnable
接口(线程执行类)
public class Demo02 {public static void main(String[] args) {//方式2:java.lang.Runnable 接口实现多线程//1.创建Runnable的实现类,重写run方法//2.创建Runnable的实现类对象r1//3.创建Thread类的对象,将r1作为构造方法的参数进行创建//4.调用start方法启动线程MyRun myRun = new MyRun();Thread t1 = new Thread(myRun);t1.start();for (int i = 0; i < 10; i++) {System.out.println("主线程执行任务:"+i);}}
}
class MyRun implements Runnable {public void run() {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName()+"线程执行任务:"+i);}}
}
3. 3 方式三:实现 java.util.concurrent.Callable
接口,允许子线程返回结果、抛出异常
public class Demo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {//方式3:Callable接口方式实现多线程//步骤://1.创建Callable的实现类,重写call方法//2.创建Callable的实现类对象//3.创建FutureTask对象,用来进行结果的管理操作//4.创建Thread类的对象,将步骤3的对象作为参数传递//5.启动线程MyCallable callable1 = new MyCallable(1,10);FutureTask<Integer> futureTask1 = new FutureTask<Integer>(callable1);Thread thread1=new Thread(futureTask1);thread1.setName("线程1");thread1.start();MyCallable callable2 = new MyCallable(2,11);FutureTask<Integer> futureTask2 = new FutureTask<Integer>(callable2);Thread thread2=new Thread(futureTask2);thread2.setName("线程2");thread2.start();System.out.println("子线程C1和为:"+futureTask1.get());System.out.println("子线程C2和为:"+futureTask2.get());for (int i=1;i<=10;i++) {System.out.println("主线程:"+i);}}
}class MyCallable implements Callable<Integer> {int begin,end;public MyCallable(int begin,int end) {this.begin = begin;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int i = begin; i <= end; i++) {sum += i;System.out.println(Thread.currentThread().getName()+"正在进行加"+i+"操作");}return sum;}
}
3. 4 方式四:线程池
public class Demo04 {public static void main(String[] args) {//使用线程池创建线程对象ExecutorService ex= Executors.newCachedThreadPool();ex.execute(new MyRun());ex.execute(new MyRun());}
}
总结:
- Thread 编程比较简单,可以直接使用Thread类中的方法 可扩展性差
- Runnable 拓展性强,实现该接口后还可以继承其他的类再实现其他的接口
- Callable 拓展性强,实现该接口还可以继承其他类,可以有返回值
4 线程所拥有的方法
4.1 getName()获取线程名
public class Demo01 {public static void main(String[] args) {//获取当前线程Thread t1=Thread.currentThread();System.out.println("线程对象: "+t1);//获取线程名 getName(),如果没有为线程命名,系统会默认指定线程名,命名规则是Thread-N的形式System.out.println("线程名:"+t1.getName());}
}
4.2 setName()设置线程名
public class Demo02 {public static void main(String[] args) {//给线程设置名字,setName()//让当前线程休眠:Thread.sleep(long ,min) 参数为毫秒值MyThread t1 = new MyThread();t1.setName("土豆");t1.start();//使用构造方法设置线程名MyThread t2 = new MyThread("洋芋");t2.start();//Runnable作为参数Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"线程正在启动");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"线程运行结束");}};Thread thread = new Thread(runnable);thread.setName("马铃薯");thread.start();}
}
4.3 sleep()线程休眠
public class MyThread extends Thread {public MyThread(){}public MyThread(String name) {super(name);}@Overridepublic void run() {System.out.println("当前线程为:"+getName());for(int i=1; i<=5; i++){try {//休眠1秒Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println(i);}}}
}
4.4 setPriority(long newPriority) 设置线程优先级
public class Demo03 {public static void main(String[] args) {Runnable runnable = new Runnable() {public void run() {for (char i = 'a'; i <= 'g'; i++) {System.out.println(Thread.currentThread().getName() + ": " + i);}}};Runnable runnable2 = new Runnable() {public void run() {for (char i = 'A'; i <= 'J'; i++) {System.out.println(Thread.currentThread().getName() + ": " + i);}}};Thread t1=new Thread(runnable,"线程1");Thread t2=new Thread(runnable2,"线程2");//设置线程优先级 1~10,//注意:优先级高的线程抢占到资源的概率较大,但不一定优先抢占到资源t1.setPriority(1);t2.setPriority(10);t1.start();t2.start();//获取线程优先级System.out.println(t1.getName()+"的优先级为"+t1.getPriority());System.out.println(t2.getName()+"的优先级为"+t2.getPriority());System.out.println(Thread.currentThread().getName()+"的优先级为"+Thread.currentThread().getPriority());}
}
4.5 setDaemon设置守护线程
public class Demo04 {public static void main(String[] args) {Thread t1 = new Thread("线程1") {public void run() {for (char i = 'a'; i < 'z'; i++) {System.out.println(getName()+":"+i);}}};Thread t2 = new Thread("线程2") {public void run() {for (char i = 'A'; i < 'Z'; i++) {System.out.println(getName()+":"+i);}}};//t2线程设置为守护线程//细节: 当其他的非守护线程执行完毕后,守护线程会陆陆续续的执行结束t2.setDaemon(true);t1.start();t2.start();}
}
4.6 yield()线程礼让
public class Demo05 {public static void main(String[] args) {Thread t1 = new Thread("线程1") {public void run() {for (char i = 'a'; i < 'z'; i++) {System.out.println(getName()+":"+i);Thread.yield();}}};Thread t2 = new Thread("线程2") {public void run() {Thread.yield();for (char i = 'A'; i < 'Z'; i++) {System.out.println(getName()+":"+i);}}};t1.start();t2.start();}
}
4.7 join()线程插队
public class Demo06 {public static void main(String[] args) throws InterruptedException {Thread t1=new Thread(){public void run(){System.out.println("进到"+getName()+"之中");for(int i=1;i<=100;i++){System.out.println(i);}System.out.println("结束");}};t1.start();t1.join(); //线程的插队,插入当前的线程的前面//主线程执行的任务for(char i='A';i<='z';i++){System.out.println("main:"+i);}}
}
4.8 interrupt()线程的中断
public class Demo07 {public static void main(String[] args) {Thread t1 = new Thread("线程1"){public void run(){//获取当前系统时间long startTime = System.currentTimeMillis();System.out.println("进入到"+getName()+"线程中");try {Thread.sleep(3000);} catch (InterruptedException e) {System.out.println("中断"+getName()+"线程");e.printStackTrace();}System.out.println("结束"+getName()+"线程");long endTime = System.currentTimeMillis();System.out.println(getName()+"运行时间:"+(endTime-startTime));}};t1.start();//让主线程休眠System.out.println("main线程进入");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}//main主线程修改t1线程的中断状态=true//t1线程检测中断状态=true,则抛出InterruptedException,子线程执行结束t1.interrupt();}
}
5 多线程同步
Synchronized
同步锁,简单来说,使用Synchronized
关键字将一段代码逻辑,用一把锁给锁起来,只有获得了这把锁的线程才访问。并且同一时刻, 只有一个线程能持有这把锁, 这样就保证了同一时刻只有一个线程能执行被锁住的代码,从而确保代码的线程安全。
看不懂?没有关系,我也看不懂,简单来说就是,如果有多个线程访问同一个资源,如果在不加锁的情况下,资源会混乱,比如有三个线程同时访问一个数number,并且修改他的值,那么这个值最终大概会达不到想要的结果,产生数据混乱,比如两个线程分别对一个数进行操作,其中一个加100次1,另外一个减100次1,预想的结果应该还是这个数本身,但如果不加锁,那么他极有可能变为别的值。
synchronized 关键字的用法
* 1.加锁:
* 锁对象可以是任意类型的对象,必须要保证多条线程共用一个锁对象
* synchronized(锁对象){
* 操作的共享代码
* }
*
* 某条线程获取到了锁资源,锁关闭,当里面的任务执行完成,锁释放
* 默认情况下,锁是打开状态
*
* 2.锁方法:
* synchronized加到方法上,变成了锁方法,注意锁不能自己指定
* 修饰符 synchronized 返回值类型 方法名(){
* 操作的共享代码
* }
* 锁:同步锁不能自己指定,普通方法锁是当前对象
1 修饰实例方法:
当使用synchronized
修饰实例方法时, 以下两种写法作用和意义相同:
方法1:
public class ShouMai extends Thread {public static int ticket = 1000;public static Object lock = new Object();public ShouMai(String name) {super(name);}//请加锁,保证线程安全@Overridepublic void run() {while (true) {synchronized (ShouMai.class) {if (ticket > 0) {System.out.println(getName() + "正在售卖第" + (1000 - --ticket) + "张票");} else {System.out.println(getName() + ":票已售罄");break;}}}}
}
方式2:
public class ShouMai1 extends Thread {public static int ticket = 1000;public ShouMai1(String name) {super(name);}//加锁,保证线程安全@Overridepublic void run() {while (true) {if(!shoumai()) break;}}public synchronized static boolean shoumai() {if (ticket > 0) {System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票");return true;} else {System.out.println(Thread.currentThread().getName() + ":票已售罄");return false;}}
}
2 修饰静态方法
public class ShouMai2Imp implements Runnable {public static int ticket=1000;@Overridepublic void run() {while (true){if(!shoumai()) break;}}public synchronized static boolean shoumai(){if (ticket<=0) return false;System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票");return true;}
}
3 修饰代码块
synchronized(自定义对象) {//临界区
}
lock锁的使用
在java中,可以是使用Object.lock()方法进行加锁,Object.unlock()方法进行解锁
public class ShouMai3 extends Thread {public static int ticket = 200;private static Lock lock = new ReentrantLock();public ShouMai3(String name) {super(name);}@Overridepublic void run() {while (true) {lock.lock();try {if (ticket > 0) {System.out.println(getName() + "正在售卖第" + (200 - --ticket) + "张票");} else {System.out.println(getName() + ":票已售罄");break;}Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}
}
6 线程的状态
在整个线程的生命周期中,线程的状态有以下6
种:
New
:新建状态,新创建的线程,此时尚未调用start()
方法;Runnable
:运行状态,运行中的线程,已经调用start()
方法,线程正在或即将执行run()
方法;Blocked
:阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行;Waiting
:等待状态,运行中的线程,因为join()
等方法调用,进入等待;Timed Waiting
:计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)
join(等待毫秒值)
等方法,进入计时等待;Terminated
:终止状态,线程已终止,因为run()
方法执行完毕。
当线程启动后,它可以在Runnable
、Blocked
、Waiting
和Timed Waiting
这几个状态之间切换,直到最后变成Terminated
状态,线程终止
1 NEW
新建状态,新创建的线程,此时尚未调用start()方法
public class Demo01 {public static void main(String[] args) {Thread thread=new Thread();System.out.println(thread.getState());}
}
2 RUNNABLE
运行状态,运行中的线程,已经调用start()方法,线程正在或即将执行run方法
public class Demo02 {public static void main(String[] args) {Thread t1=new Thread(){@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println("正在执行任务");}}};//启动线程t1.start();System.out.println(t1.getState());}
}
3 Terminated
终止状态,线程已终止,因为run()方法执行完毕
public class Demo03 {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("正在执行任务");}});//启动线程thread.start();//等待子线程完成任务Thread.sleep(500);//获取thread线程状态System.out.println(thread.getState());}
}
4 BLOCKED与TIMED_WAITING
- BLOCKED 阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行
- TIMED_WAITING 计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)
sleep() 不释放锁资源
wait() 释放锁资源 -- Object中的方法,wait()调用时对象必须要和锁对象一样
public class Demo04 {public static void main(String[] args) throws InterruptedException {Object lock = new Object();Runnable runnable = new Runnable() {@Overridepublic void run() {synchronized (lock) {//如果某条线程获取到锁资源,任务一直执行,不释放锁资源while (true) {//死循环try {Thread.sleep(1000);
// lock.wait(1000);} catch (InterruptedException e) {e.printStackTrace();}break;}}}};Thread thread1 = new Thread(runnable,"线程1");Thread thread2 = new Thread(runnable,"线程2");thread1.start();thread2.start();//让主线程休眠500毫秒,让t1和t2同时竞争一个锁资源Thread.sleep(500);System.out.println(thread1.getName()+":"+thread1.getState());System.out.println(thread2.getName()+":"+thread2.getState());}
}
5 WAITING
等待状态,运行中的线程,因为join()
等方法调用,进入等待;
public class Demo05 {public static void main(String[] args) throws InterruptedException {Object lock = new Object();Runnable runnable = new Runnable() {@Overridepublic void run() {synchronized (lock) {try {lock.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}};Thread thread = new Thread(runnable);thread.start();Thread.sleep(500);System.out.println(thread.getState());}
}
public class Demo06 {public static void main(String[] args) throws InterruptedException {Thread outThread = new Thread(new Runnable() {public void run() {System.out.println("外部线程启动");//在内部又创建了一个线程Thread innerThread = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("内部线程启动");//innerThread任务一直执行while (true){try {Thread.sleep(2000);break;} catch (InterruptedException e) {throw new RuntimeException(e);}}}});innerThread.start();try {innerThread.join(); //innerThread插队,插队插到outThread前面,outThread此时处于等待innerThread完成任务状态,所以outThread等待} catch (InterruptedException e) {throw new RuntimeException(e);}}});outThread.start(); //线程启动//让主线程休眠,目的时为了让outThread运行起来Thread.sleep(100);//获取outThread的状态System.out.println(outThread.getState());}
}
7 线程池
概念
线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
┌─────┐ execute ┌──────────────────┐
│Task1│─────────>│ThreadPool │
├─────┤ │┌───────┐┌───────┐│
│Task2│ ││Thread1││Thread2││
├─────┤ │└───────┘└───────┘│
│Task3│ │┌───────┐┌───────┐│
├─────┤ ││Thread3││Thread4││
│Task4│ │└───────┘└───────┘│
├─────┤ └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘...
线程池常用方法
执行无返回值的线程任务:
void execute(Runnable command);
提交有返回值的线程任务:
Future<T> submit(Callable<T> task);
关闭线程池:
void shutdown();
或shutdownNow();
等待线程池关闭:
boolean awaitTermination(long timeout, TimeUnit unit);
执行线程任务
execute()
只能提交Runnable
类型的任务,没有返回值,而submit()
既能提交Runnable
类型任务也能提交Callable
类型任务,可以返回Future
类型结果,用于获取线程任务执行结果。
execute()
方法提交的任务异常是直接抛出的,而submit()
方法是捕获异常,当调用Future
的get()
方法获取返回值时,才会抛出异常。
线程池分类
Java
标准库提供的几种常用线程池,创建这些线程池的方法都被封装到Executors
工具类中。
FixedThreadPool:线程数固定的线程池,使用
Executors.newFixedThreadPool()
创建;CachedThreadPool:线程数根据任务动态调整的线程池,使用
Executors.newCachedThreadPool()
创建;SingleThreadExecutor:仅提供一个单线程的线程池,使用
Executors.newSingleThreadExecutor()
创建;ScheduledThreadPool:能实现定时、周期性任务的线程池,使用
Executors.newScheduledThreadPool()
创建;
FixedThreadPool
public class Demo01 {public static void main(String[] args) throws InterruptedException {//1.获取线程池对象ExecutorService executorService= Executors.newFixedThreadPool(3);//2.提交任务给线程池对象executorService.execute(new MyRunnable("任务1"));executorService.execute(new MyRunnable("任务2"));executorService.execute(new MyRunnable("任务3"));Thread.sleep(100); //主线程休眠executorService.execute(new MyRunnable("任务4"));//3.线程池的关闭方法 线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务;executorService.shutdown();
// executorService.shutdownNow();/*当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。第一个参数指定的是时间,第二个参数指定的是时间单位(当前是秒)。返回值类型为boolean型。如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true。如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false。如果等待时间没有超过指定时间,则继续等待。*/while (!executorService.awaitTermination(1, TimeUnit.SECONDS)){System.out.println("还没关闭线程池");};System.out.println("已经关闭线程池");}
}class MyRunnable implements Runnable{public String str;public MyRunnable(String str) {this.str = str;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+" 开始执行"+str);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
// for (int i = 0; i < 10; i++) {
// System.out.println(Thread.currentThread().getName()+":"+i);
// }System.out.println(Thread.currentThread().getName()+" 结束"+str);}
}
CachedThreadPool
public class Demo02 {public static void main(String[] args) throws InterruptedException {//创建一个线程数量无上限的线程池对象ExecutorService executorService= Executors.newCachedThreadPool();for (int i = 1; i <= 100; i++) {executorService.execute(new MyRunnable("任务"+i));}Thread.sleep(2000);executorService.execute(new MyRunnable("最后一个任务"));executorService.shutdown();}
}
SingleThreadExecutor
public class Demo03 {public static void main(String[] args) {ExecutorService executorService= Executors.newSingleThreadExecutor();//循环的提交任务for (int i = 1; i < 10; i++) {executorService.execute(new MyRunnable("任务"+i));}executorService.shutdown();}
}
ScheduledThreadPool
public class Demo04 {public static void main(String[] args) {ScheduledExecutorService executorService= Executors.newScheduledThreadPool(2);Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"开始执行");try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+"执行结束");}};//2.提交任务 --延时多久进行任务的执行//schedule(Runnable command,long delay, TimeUnit unit) 任务对象 延迟时间 时间单位
// executorService.schedule(runnable,2,TimeUnit.SECONDS);//scheduleAtFixedRate 延时1秒首次执行此任务,每隔2秒进行此任务的执行executorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);//上一次任务执行完毕后,等待固定的时间间隔,再进行下一次任务
// executorService.scheduleWithFixedDelay(runnable, 2, 3, TimeUnit.SECONDS);}
}
自定义线程池
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors
去创建,而要通过ThreadPoolExecutor
方式。jdk
中Executor
框架虽然提供了如newFixedThreadPool()
、newSingleThreadExecutor()
、newCachedThreadPool()
等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过new ThreadPoolExecutor
方式实现,使用ThreadPoolExecutor
有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
必须为线程池中的线程,按照业务规则,进行命名。可以在创建线程池时,使用自定义线程工厂规范线程命名方式,避免线程使用默认名称。
如何创建一个线程池?
public ThreadPoolExecutor(int corePoolSize, 核心线程数 >=0
int maximumPoolSize, 最大线程数(核心+临时) >corePoolSize
long keepAliveTime, 临时线程存活时间
TimeUnit unit, 存活的时间单位
BlockingQueue<Runnable> workQueue, 等待队列
ThreadFactory threadFactory, 线程工厂
RejectedExecutionHandler handler) 拒绝策略
拒绝策略
ThreadPoolExecutor.AbortPolicy() 默认拒绝策略--抛异常
ThreadPoolExecutor.DiscardOldestPolicy() 丢弃等待时间最久的任务
ThreadPoolExecutor.DiscardPolicy() 丢弃当前的任务
ThreadPoolExecutor.CallerRunsPolicy() 让当前的线程进行任务的执行
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());//提交任务
executorService.submit(new MyRunnable("任务1")); //提交一个任务给executorService对象
executorService.submit(new MyRunnable("任务2"));
executorService.submit(new MyRunnable("任务3"));
executorService.submit(new MyRunnable("任务4")); //核心线程已满,进入等待队列
executorService.submit(new MyRunnable("任务5"));
executorService.submit(new MyRunnable("任务6"));
executorService.submit(new MyRunnable("任务7")); //核心线程已满,等待队列已满,进入临时线程
executorService.submit(new MyRunnable("任务8"));
executorService.submit(new MyRunnable("任务9")); //核心线程已满,等待队列已满,临时线程已满,抛出异常executorService.shutdown();
自定义线程工厂与拒绝策略
自定义线程工厂
class MyThreadFactory implements ThreadFactory {//线程池名称前缀String namePrefix;public MyThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}//具备原子性的Integer类型private AtomicInteger threadNumber = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,namePrefix+"线程"+threadNumber.getAndIncrement());}
}
主要实现ThreadFactory接口中的newThread方法,我写的这个方法也就是对线程池与线程池中的线程进行了重命名
自定义拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("哎呦,达到能执行任务的上限了,(宝ᴗ宝)做不到啊");}
}
在这里呢我自定义了一个拒绝策略,当有一个线程进入到线程池后,核心线程,线程队列,临时线程等都没有空闲位置时,会直接输出一段话,线程不会进入线程池之中。
线程池的状态
线程池的状态分为:RUNNING , SHUTDOWN , STOP , TIDYING , TERMINATED
RUNNING:运行状态,线程池被一旦被创建,就处于RUNNING
状态,并且线程池中的任务数为0
。该状态的线程池会接收新任务,并处理工作队列中的任务。
- 调用线程池的
shutdown()
方法,可以切换到SHUTDOWN关闭状态; - 调用线程池的
shutdownNow()
方法,可以切换到STOP停止状态;
SHUTDOWN :关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;当工作队列为空时,并且线程池中执行的任务也为空时,线程池进入TIDYING状态;
STOP:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行 的任务;线程池中执行的任务为空,进入TIDYING状态;
TIDYING :整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0
;terminated()
执行完毕,进入TERMINATED
状态;
TERMINATED : 终止状态,该状态表示线程池彻底关闭。
线程池的优点
- 提高响应的速度
- 提高线程的可管理性
- 降低资源消耗(线程执行完任务,不销毁,可以执行其他的任务)
线程池核心代码
线程池相关的接口和实现类
线程池技术,是由2
个核心接口和一组实现类组成。
Executor
接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()
方法。未来线程池中所有的线程任务,都将由exuecte()
方法来执行。
ExecutorService
接口继承了Executor
接口,扩展了awaitTermination()
、submit()
、shutdown()
等专门用于管理线程任务的方法。
ExecutorService
接口的抽象实现类AbstractExecutorService
,为不同的线程池实现类,提供submit()
、invokeAll()
等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()
执行策略不同,所以在AbstractExecutorService
并未提供该方法的具体实现。
AbstractExecutorService
有两个常见的子类ThreadPoolExecutor
和ForkJoinPool
,用于实现不同的线程池。
ThreadPoolExecutor
线程池通过Woker
工作线程、BlockingQueue
阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;
ForkJoinPool
是一个基于分治思想的线程池实现类,通过分叉(fork
)合并(join
)的方式,将一个大任务拆分成多个小任务,并且为每个工作线程提供一个工作队列,减少竞争,实现并行的线程任务执行方式,所以ForkJoinPool适合计算密集型场景,是ThreadPoolExecutor线程池的一种补充。
ScheduledThreadPoolExecutor
类是ThreadPoolExecutor
类的子类,按照时间周期执行线程任务的线程池实现类,通常用于作业调度相关的业务场景。由于该线程池的工作队列使用DelayedWorkQueue
,这是一个按照任务执行时间进行排序的优先级工作队列,所以这也是ScheduledThreadPoolExecutor
线程池能按照时间周期来执行线程任务的主要原因。
工作线程Worker类
每个Woker
类的对象,都代表线程池中的一个工作线程。
当ThreadPoolExecutor
线程池,通过exeute()
方法执行1
个线程任务时,会调用addWorker()
方法创建一个Woker
工作线程对象。并且,创建好的Worker
工作线程对象,会被添加到一个HashSet<Worker> workders
工作线程集合,统一由线程池进行管理。
通过阅读源代码,可以看出Worker
类是ThreadPoolExecutor
类中定义的一个私有内部类,保存了每个Worker
工作线程要执行的Runnable
线程任务和Thread
线程对象。
当创建Worker
工作线程时,会通过构造方法保存Runnable
线程任务,同时使用ThreadFactory
线程工厂,为该工作线程创建一个Thread
线程对象。通过这样的操作,每个Worker
工作线程对象,都将绑定一个真正的Thread
线程。
另外,当Thread
线程被JVM
调度执行时,线程将会自动执行Worker
工作线程对象的run()
方法,通过调用runWorker()
方法,最终实现Woker
工作线程中所保存的Runnable
线程任务的执行。
值得重视的是:当Worker
工作线程,在第一次执行完成线程任务后,这个Worker
工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()
方法,获取阻塞工作队列中新的Runnable
线程任务,并通过当前Worker
工作线程中所绑定Thread
线程,完成新线程任务的执行,从而实现了线程池的中Thread
线程的重复使用。
核心方法:execute()方法
ThreadPoolExecutor
线程池中,会通过execute(Runnable command)
方法执行Runnable
类型的线程任务。
完整实现了Executor
接口定义execute()
方法,这个方法作用是执行一个Runnable
类型的线程任务。整体的执行流程是:
首先,通过
AtomicInteger
类型的ctl
对象,获取线程池的状态和工作线程数;然后,判断当前线程池中的工作线程数;
如果,工作线程的数量小于核心线程数,则通过
addWorker()
方法,创建新的Worker
工作线程,并添加至workers
工作线程集合;如果,工作线程的数量大于核心线程数,并且线程池处于
RUNNING
状态,那么,线程池会将Runnable
类型的线程任务,缓存至workQueue
阻塞工作队列,等待某个空闲工作线程获取并执行该任务;如果,
workQueue
工作队列缓存线程任务失败,代表工作队列已满。那么,线程池会重新通过addWorker()
方法,尝试创建新的工作线程;这次创建时,会判断工作线程数是否超出最大线程数。如果没有超出,会创建新的工作线程;如果已经超出,则返回
false
,代表创建失败;如果创建失败,线程池执行拒绝策略;
public class ThreadPoolExecutor
{// 线程池执行Runnable线程任务public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池的状态和工作线程数int c = ctl.get();// 工作线程的数量小于核心线程数if (workerCountOf(c) < corePoolSize) {// 创建新的Worker工作线程if (addWorker(command, true))return;// 创建失败,重新获取线程池的状态和工作线程数c = ctl.get();}// 如果线程池处于RUNNING状态,缓存线程任务至工作队列if (isRunning(c) && workQueue.offer(command)) {// 任务缓存成功// 重新获取线程池的状态和工作线程数int recheck = ctl.get();// 如果线程池不是处于RUNNING状态,则删除任务if (! isRunning(recheck) && remove(command))// 执行拒绝策略reject(command);// 如果工作线程数等于零// 通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果缓存线程任务至工作队列// 尝试创建新的工作线程// 创建时,判断工作线程数是否超出最大线程数// 如果没有超出,创建成功// 如果已经超出,创建失败else if (!addWorker(command, false))// 执行拒绝策略reject(command);}
}
核心方法:addWorker()方法
在execute()
方法的执行过程中,会通过addWorker()
方法创建一个工作线程,用于执行当前线程任务。
阅读源代码,会发现,这个方法的整个执行过程可以分为两个部分:检查线程池的状态和工作线程数量、创建并执行工作线程。
1 检查线程池的状态和工作线程数量
private boolean addWorker(Runnable firstTask, boolean core) {// 第1部分:检查线程池的状态和工作线程数量// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {// 通过ctl对象,获取当前线程池的运行状态int c = ctl.get();int rs = runStateOf(c);// 如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)// 则工作线程创建失败if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 检查工作线程数量for (;;) {// 通过ctl对象,获取当前线程池中工作线程数量int wc = workerCountOf(c);// 工作线程数量如果超出最大容量或者核心线程数(最大线程数)// 则工作线程创建失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;// 再次获取线程池状态,检查是否发生变化c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 第2部分:创建并执行工作线程....
}
2 创建并执行工作线程
private boolean addWorker(Runnable firstTask, boolean core) {// 第1部分:检查线程池的状态和工作线程数量....// 第2部分:创建并执行工作线程....boolean workerStarted = false; // 工作线程是否已经启动boolean workerAdded = false; // 工作线程是否已经保存Worker w = null;try {// 创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);// 获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread;if (t != null) {// 获取线程池的ReentrantLock主锁对象// 确保在添加和启动线程时的同步与安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查线程池状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查Thread线程对象的状态是否已经处于启动状态if (t.isAlive()) throw new IllegalThreadStateException();// 保存工作线程workers.add(w);// 记录线程池曾经达到过的最大工作线程数量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 添加工作线程后,正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}// 返回线程启动状态return workerStarted;
}