当前位置: 首页 > news >正文

JAVA中关于多线程的学习和使用

多线程基础

1  概述

现代操作系统(WindowsmacOSLinux)都可以执行多任务。多任务就是同时运行多个任务。例如:播放音乐的同时,浏览器可以进行文件下载,同时可以进行QQ消息的收发。

CPU执行代码都是一条一条顺序执行的,但是,即使是单核CPU,也可以同时运行多个任务。因为操作系统执行多任务实际上就是让CPU对多个任务轮流交替执行。

操作系统轮流让多个任务交替执行,例如,让浏览器执行0.001秒,让QQ执行0.001秒,再让音乐播放器执行0.001秒。在用户使用的体验看来,CPU就是在同时执行多个任务。

1  进程与线程

程序:程序是含有指令和数据的文件,被存储在磁盘或其他的数据存储设备中,可以理解为程序是包含静态代码的文件。例如:浏览器软件、音乐播放器软件等软件的安装目录和文件。

进程:进程是程序的一次执行过程,是系统运行程序的基本单位。

线程:某些进程内部还需要同时执行多个子任务。例如,我们在使用WPS时,WPS可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行自动保存和上传云文档,我们把子任务称为线程。线程是进程划分成的更小的运行单位。

进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个主线程。

2  线程基本概念

单线程:单线程就是进程中只有一个线程。单线程在程序执行时,所走的程序路径按照连续顺序排下来,前面的必须处理好,后面的才会执行

多线程:由一个以上的线程组成的程序称为多线程程序。Java中,一定是从主线程开始执行(main方法),然后在主线程的某个位置创建并启动新的线程。

多线程的应用场景

  • 软件中的耗时操作,拷贝的迁移文件,加载大量资源的时候
  • 所有的后台服务器
  • 所有的聊天软件

3  线程的创建方式

3. 1 方式一:继承 java.lang.Thread 类(线程子类)

public class Demo01 {public static void main(String[] args) {//1.创建一个Thread的子类,重写run方法//2.创建子类对象//3.调用start方法启动MyThread myThread = new MyThread();myThread.start();for (int i = 0; i < 10; i++) {System.out.println("主线程执行任务:"+i);}}
}class MyThread extends Thread {@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println(getName()+"正在执行任务:"+i);}}
}

3. 2 方式二:实现 java.lang.Runnable 接口(线程执行类)

public class Demo02 {public static void main(String[] args) {//方式2:java.lang.Runnable  接口实现多线程//1.创建Runnable的实现类,重写run方法//2.创建Runnable的实现类对象r1//3.创建Thread类的对象,将r1作为构造方法的参数进行创建//4.调用start方法启动线程MyRun myRun = new MyRun();Thread t1 = new Thread(myRun);t1.start();for (int i = 0; i < 10; i++) {System.out.println("主线程执行任务:"+i);}}
}
class MyRun implements Runnable {public void run() {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName()+"线程执行任务:"+i);}}
}

3. 3 方式三:实现 java.util.concurrent.Callable 接口,允许子线程返回结果、抛出异常

public class Demo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {//方式3:Callable接口方式实现多线程//步骤://1.创建Callable的实现类,重写call方法//2.创建Callable的实现类对象//3.创建FutureTask对象,用来进行结果的管理操作//4.创建Thread类的对象,将步骤3的对象作为参数传递//5.启动线程MyCallable callable1 = new MyCallable(1,10);FutureTask<Integer> futureTask1 = new FutureTask<Integer>(callable1);Thread thread1=new Thread(futureTask1);thread1.setName("线程1");thread1.start();MyCallable callable2 = new MyCallable(2,11);FutureTask<Integer> futureTask2 = new FutureTask<Integer>(callable2);Thread thread2=new Thread(futureTask2);thread2.setName("线程2");thread2.start();System.out.println("子线程C1和为:"+futureTask1.get());System.out.println("子线程C2和为:"+futureTask2.get());for (int i=1;i<=10;i++) {System.out.println("主线程:"+i);}}
}class MyCallable implements Callable<Integer> {int begin,end;public MyCallable(int begin,int end) {this.begin = begin;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int i = begin; i <= end; i++) {sum += i;System.out.println(Thread.currentThread().getName()+"正在进行加"+i+"操作");}return sum;}
}

3. 4 方式四:线程池

public class Demo04 {public static void main(String[] args) {//使用线程池创建线程对象ExecutorService ex= Executors.newCachedThreadPool();ex.execute(new MyRun());ex.execute(new MyRun());}
}

总结:

  • Thread   编程比较简单,可以直接使用Thread类中的方法  可扩展性差
  • Runnable  拓展性强,实现该接口后还可以继承其他的类再实现其他的接口
  • Callable   拓展性强,实现该接口还可以继承其他类,可以有返回值

4  线程所拥有的方法

4.1  getName()获取线程名

public class Demo01 {public static void main(String[] args) {//获取当前线程Thread t1=Thread.currentThread();System.out.println("线程对象: "+t1);//获取线程名  getName(),如果没有为线程命名,系统会默认指定线程名,命名规则是Thread-N的形式System.out.println("线程名:"+t1.getName());}
}

4.2  setName()设置线程名

public class Demo02 {public static void main(String[] args) {//给线程设置名字,setName()//让当前线程休眠:Thread.sleep(long ,min)  参数为毫秒值MyThread t1 = new MyThread();t1.setName("土豆");t1.start();//使用构造方法设置线程名MyThread t2 = new MyThread("洋芋");t2.start();//Runnable作为参数Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"线程正在启动");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"线程运行结束");}};Thread thread = new Thread(runnable);thread.setName("马铃薯");thread.start();}
}

4.3  sleep()线程休眠

public class MyThread extends Thread {public MyThread(){}public MyThread(String name) {super(name);}@Overridepublic void run() {System.out.println("当前线程为:"+getName());for(int i=1; i<=5; i++){try {//休眠1秒Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println(i);}}}
}

4.4  setPriority(long newPriority)  设置线程优先级

public class Demo03 {public static void main(String[] args) {Runnable runnable = new Runnable() {public void run() {for (char i = 'a'; i <= 'g'; i++) {System.out.println(Thread.currentThread().getName() + ": " + i);}}};Runnable runnable2 = new Runnable() {public void run() {for (char i = 'A'; i <= 'J'; i++) {System.out.println(Thread.currentThread().getName() + ": " + i);}}};Thread t1=new Thread(runnable,"线程1");Thread t2=new Thread(runnable2,"线程2");//设置线程优先级  1~10,//注意:优先级高的线程抢占到资源的概率较大,但不一定优先抢占到资源t1.setPriority(1);t2.setPriority(10);t1.start();t2.start();//获取线程优先级System.out.println(t1.getName()+"的优先级为"+t1.getPriority());System.out.println(t2.getName()+"的优先级为"+t2.getPriority());System.out.println(Thread.currentThread().getName()+"的优先级为"+Thread.currentThread().getPriority());}
}

4.5  setDaemon设置守护线程

public class Demo04 {public static void main(String[] args) {Thread t1 = new Thread("线程1") {public void run() {for (char i = 'a'; i < 'z'; i++) {System.out.println(getName()+":"+i);}}};Thread t2 = new Thread("线程2") {public void run() {for (char i = 'A'; i < 'Z'; i++) {System.out.println(getName()+":"+i);}}};//t2线程设置为守护线程//细节:  当其他的非守护线程执行完毕后,守护线程会陆陆续续的执行结束t2.setDaemon(true);t1.start();t2.start();}
}

4.6  yield()线程礼让

public class Demo05 {public static void main(String[] args) {Thread t1 = new Thread("线程1") {public void run() {for (char i = 'a'; i < 'z'; i++) {System.out.println(getName()+":"+i);Thread.yield();}}};Thread t2 = new Thread("线程2") {public void run() {Thread.yield();for (char i = 'A'; i < 'Z'; i++) {System.out.println(getName()+":"+i);}}};t1.start();t2.start();}
}

4.7  join()线程插队

public class Demo06 {public static void main(String[] args) throws InterruptedException {Thread t1=new Thread(){public void run(){System.out.println("进到"+getName()+"之中");for(int i=1;i<=100;i++){System.out.println(i);}System.out.println("结束");}};t1.start();t1.join();  //线程的插队,插入当前的线程的前面//主线程执行的任务for(char i='A';i<='z';i++){System.out.println("main:"+i);}}
}

 4.8  interrupt()线程的中断

public class Demo07 {public static void main(String[] args) {Thread t1 = new Thread("线程1"){public void run(){//获取当前系统时间long startTime = System.currentTimeMillis();System.out.println("进入到"+getName()+"线程中");try {Thread.sleep(3000);} catch (InterruptedException e) {System.out.println("中断"+getName()+"线程");e.printStackTrace();}System.out.println("结束"+getName()+"线程");long endTime = System.currentTimeMillis();System.out.println(getName()+"运行时间:"+(endTime-startTime));}};t1.start();//让主线程休眠System.out.println("main线程进入");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}//main主线程修改t1线程的中断状态=true//t1线程检测中断状态=true,则抛出InterruptedException,子线程执行结束t1.interrupt();}
}

5  多线程同步

Synchronized同步锁,简单来说,使用Synchronized关键字将一段代码逻辑,用一把锁给锁起来,只有获得了这把锁的线程才访问。并且同一时刻, 只有一个线程能持有这把锁, 这样就保证了同一时刻只有一个线程能执行被锁住的代码,从而确保代码的线程安全。

看不懂?没有关系,我也看不懂,简单来说就是,如果有多个线程访问同一个资源,如果在不加锁的情况下,资源会混乱,比如有三个线程同时访问一个数number,并且修改他的值,那么这个值最终大概会达不到想要的结果,产生数据混乱,比如两个线程分别对一个数进行操作,其中一个加100次1,另外一个减100次1,预想的结果应该还是这个数本身,但如果不加锁,那么他极有可能变为别的值。

synchronized 关键字的用法

* 1.加锁:
*  锁对象可以是任意类型的对象,必须要保证多条线程共用一个锁对象
*      synchronized(锁对象){
*          操作的共享代码
*      }
*
* 某条线程获取到了锁资源,锁关闭,当里面的任务执行完成,锁释放
* 默认情况下,锁是打开状态
*
* 2.锁方法:
*    synchronized加到方法上,变成了锁方法,注意锁不能自己指定
*    修饰符  synchronized  返回值类型  方法名(){
*        操作的共享代码
*    }
*  锁:同步锁不能自己指定,普通方法锁是当前对象

1  修饰实例方法:

当使用synchronized 修饰实例方法时, 以下两种写法作用和意义相同:

方法1:

public class ShouMai extends Thread {public static int ticket = 1000;public static Object lock = new Object();public ShouMai(String name) {super(name);}//请加锁,保证线程安全@Overridepublic void run() {while (true) {synchronized (ShouMai.class) {if (ticket > 0) {System.out.println(getName() + "正在售卖第" + (1000 - --ticket) + "张票");} else {System.out.println(getName() + ":票已售罄");break;}}}}
}

方式2:

public class ShouMai1 extends Thread {public static int ticket = 1000;public ShouMai1(String name) {super(name);}//加锁,保证线程安全@Overridepublic void run() {while (true) {if(!shoumai()) break;}}public synchronized static boolean shoumai() {if (ticket > 0) {System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票");return true;} else {System.out.println(Thread.currentThread().getName() + ":票已售罄");return false;}}
}

2  修饰静态方法

public class ShouMai2Imp implements Runnable {public static int ticket=1000;@Overridepublic void run() {while (true){if(!shoumai()) break;}}public synchronized static boolean shoumai(){if (ticket<=0) return false;System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票");return true;}
}

3  修饰代码块

synchronized(自定义对象) {//临界区
}

lock锁的使用

在java中,可以是使用Object.lock()方法进行加锁,Object.unlock()方法进行解锁

public class ShouMai3 extends Thread {public static int ticket = 200;private static Lock lock = new ReentrantLock();public ShouMai3(String name) {super(name);}@Overridepublic void run() {while (true) {lock.lock();try {if (ticket > 0) {System.out.println(getName() + "正在售卖第" + (200 - --ticket) + "张票");} else {System.out.println(getName() + ":票已售罄");break;}Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}
}

6  线程的状态

在整个线程的生命周期中,线程的状态有以下6种:

  • New:新建状态,新创建的线程,此时尚未调用start()方法;
  • Runnable:运行状态,运行中的线程,已经调用start()方法,线程正在或即将执行run()方法;
  • Blocked:阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行;
  • Waiting:等待状态,运行中的线程,因为join()等方法调用,进入等待;
  • Timed Waiting:计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)join(等待毫秒值)等方法,进入计时等待;
  • Terminated:终止状态,线程已终止,因为run()方法执行完毕。

当线程启动后,它可以在RunnableBlockedWaitingTimed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止

1  NEW

新建状态,新创建的线程,此时尚未调用start()方法

public class Demo01 {public static void main(String[] args) {Thread thread=new Thread();System.out.println(thread.getState());}
}

2  RUNNABLE

运行状态,运行中的线程,已经调用start()方法,线程正在或即将执行run方法

public class Demo02 {public static void main(String[] args) {Thread t1=new Thread(){@Overridepublic void run() {for (int i = 0; i < 10; i++) {System.out.println("正在执行任务");}}};//启动线程t1.start();System.out.println(t1.getState());}
}

3  Terminated 

终止状态,线程已终止,因为run()方法执行完毕

public class Demo03 {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("正在执行任务");}});//启动线程thread.start();//等待子线程完成任务Thread.sleep(500);//获取thread线程状态System.out.println(thread.getState());}
}

4  BLOCKED与TIMED_WAITING

  • BLOCKED  阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行
  • TIMED_WAITING  计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)

     sleep()  不释放锁资源
wait()   释放锁资源  --  Object中的方法,wait()调用时对象必须要和锁对象一样

public class Demo04 {public static void main(String[] args) throws InterruptedException {Object lock = new Object();Runnable runnable = new Runnable() {@Overridepublic void run() {synchronized (lock) {//如果某条线程获取到锁资源,任务一直执行,不释放锁资源while (true) {//死循环try {Thread.sleep(1000);
//                            lock.wait(1000);} catch (InterruptedException e) {e.printStackTrace();}break;}}}};Thread thread1 = new Thread(runnable,"线程1");Thread thread2 = new Thread(runnable,"线程2");thread1.start();thread2.start();//让主线程休眠500毫秒,让t1和t2同时竞争一个锁资源Thread.sleep(500);System.out.println(thread1.getName()+":"+thread1.getState());System.out.println(thread2.getName()+":"+thread2.getState());}
}

5  WAITING

等待状态,运行中的线程,因为join()等方法调用,进入等待;

public class Demo05 {public static void main(String[] args) throws InterruptedException {Object lock = new Object();Runnable runnable = new Runnable() {@Overridepublic void run() {synchronized (lock) {try {lock.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}};Thread thread = new Thread(runnable);thread.start();Thread.sleep(500);System.out.println(thread.getState());}
}
public class Demo06 {public static void main(String[] args) throws InterruptedException {Thread outThread = new Thread(new Runnable() {public void run() {System.out.println("外部线程启动");//在内部又创建了一个线程Thread innerThread = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("内部线程启动");//innerThread任务一直执行while (true){try {Thread.sleep(2000);break;} catch (InterruptedException e) {throw new RuntimeException(e);}}}});innerThread.start();try {innerThread.join();  //innerThread插队,插队插到outThread前面,outThread此时处于等待innerThread完成任务状态,所以outThread等待} catch (InterruptedException e) {throw new RuntimeException(e);}}});outThread.start();  //线程启动//让主线程休眠,目的时为了让outThread运行起来Thread.sleep(100);//获取outThread的状态System.out.println(outThread.getState());}
}

7  线程池

概念

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。

┌─────┐ execute  ┌──────────────────┐
│Task1│─────────>│ThreadPool        │
├─────┤          │┌───────┐┌───────┐│
│Task2│          ││Thread1││Thread2││
├─────┤          │└───────┘└───────┘│
│Task3│          │┌───────┐┌───────┐│
├─────┤          ││Thread3││Thread4││
│Task4│          │└───────┘└───────┘│
├─────┤          └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘...

线程池常用方法

  • 执行无返回值的线程任务void execute(Runnable command);

  • 提交有返回值的线程任务Future<T> submit(Callable<T> task);

  • 关闭线程池void shutdown();shutdownNow();

  • 等待线程池关闭boolean awaitTermination(long timeout, TimeUnit unit);

执行线程任务

execute()只能提交Runnable类型的任务,没有返回值,而submit()既能提交Runnable类型任务也能提交Callable类型任务,可以返回Future类型结果,用于获取线程任务执行结果。

execute()方法提交的任务异常是直接抛出的,而submit()方法是捕获异常,当调用Futureget()方法获取返回值时,才会抛出异常。

线程池分类

Java标准库提供的几种常用线程池,创建这些线程池的方法都被封装到Executors工具类中。

  • FixedThreadPool线程数固定的线程池,使用Executors.newFixedThreadPool()创建;

  • CachedThreadPool线程数根据任务动态调整的线程池,使用Executors.newCachedThreadPool()创建;

  • SingleThreadExecutor:仅提供一个单线程的线程池,使用Executors.newSingleThreadExecutor()创建;

  • ScheduledThreadPool:能实现定时、周期性任务的线程池,使用Executors.newScheduledThreadPool()创建;

FixedThreadPool

public class Demo01 {public static void main(String[] args) throws InterruptedException {//1.获取线程池对象ExecutorService executorService= Executors.newFixedThreadPool(3);//2.提交任务给线程池对象executorService.execute(new MyRunnable("任务1"));executorService.execute(new MyRunnable("任务2"));executorService.execute(new MyRunnable("任务3"));Thread.sleep(100);  //主线程休眠executorService.execute(new MyRunnable("任务4"));//3.线程池的关闭方法  线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务;executorService.shutdown();
//        executorService.shutdownNow();/*当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。第一个参数指定的是时间,第二个参数指定的是时间单位(当前是秒)。返回值类型为boolean型。如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true。如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false。如果等待时间没有超过指定时间,则继续等待。*/while (!executorService.awaitTermination(1, TimeUnit.SECONDS)){System.out.println("还没关闭线程池");};System.out.println("已经关闭线程池");}
}class MyRunnable implements Runnable{public String str;public MyRunnable(String str) {this.str = str;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"  开始执行"+str);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
//        for (int i = 0; i < 10; i++) {
//            System.out.println(Thread.currentThread().getName()+":"+i);
//        }System.out.println(Thread.currentThread().getName()+"  结束"+str);}
}

CachedThreadPool

public class Demo02 {public static void main(String[] args) throws InterruptedException {//创建一个线程数量无上限的线程池对象ExecutorService executorService= Executors.newCachedThreadPool();for (int i = 1; i <= 100; i++) {executorService.execute(new MyRunnable("任务"+i));}Thread.sleep(2000);executorService.execute(new MyRunnable("最后一个任务"));executorService.shutdown();}
}

SingleThreadExecutor

public class Demo03 {public static void main(String[] args) {ExecutorService executorService= Executors.newSingleThreadExecutor();//循环的提交任务for (int i = 1; i < 10; i++) {executorService.execute(new MyRunnable("任务"+i));}executorService.shutdown();}
}

ScheduledThreadPool

public class Demo04 {public static void main(String[] args) {ScheduledExecutorService executorService= Executors.newScheduledThreadPool(2);Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"开始执行");try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+"执行结束");}};//2.提交任务  --延时多久进行任务的执行//schedule(Runnable command,long delay, TimeUnit unit)  任务对象  延迟时间  时间单位
//        executorService.schedule(runnable,2,TimeUnit.SECONDS);//scheduleAtFixedRate  延时1秒首次执行此任务,每隔2秒进行此任务的执行executorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);//上一次任务执行完毕后,等待固定的时间间隔,再进行下一次任务
//        executorService.scheduleWithFixedDelay(runnable, 2, 3, TimeUnit.SECONDS);}
}

自定义线程池

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式。jdkExecutor框架虽然提供了如newFixedThreadPool()newSingleThreadExecutor()newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过new ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

必须为线程池中的线程,按照业务规则,进行命名。可以在创建线程池时,使用自定义线程工厂规范线程命名方式,避免线程使用默认名称。

如何创建一个线程池?

public ThreadPoolExecutor(int corePoolSize,  核心线程数 >=0
int maximumPoolSize,   最大线程数(核心+临时) >corePoolSize
long keepAliveTime,    临时线程存活时间
TimeUnit unit,          存活的时间单位
BlockingQueue<Runnable> workQueue,    等待队列
ThreadFactory threadFactory,  线程工厂
RejectedExecutionHandler handler)   拒绝策略

拒绝策略
ThreadPoolExecutor.AbortPolicy()   默认拒绝策略--抛异常
ThreadPoolExecutor.DiscardOldestPolicy()  丢弃等待时间最久的任务
ThreadPoolExecutor.DiscardPolicy()  丢弃当前的任务
ThreadPoolExecutor.CallerRunsPolicy()  让当前的线程进行任务的执行

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());//提交任务
executorService.submit(new MyRunnable("任务1"));  //提交一个任务给executorService对象
executorService.submit(new MyRunnable("任务2"));
executorService.submit(new MyRunnable("任务3"));
executorService.submit(new MyRunnable("任务4"));  //核心线程已满,进入等待队列
executorService.submit(new MyRunnable("任务5"));
executorService.submit(new MyRunnable("任务6"));
executorService.submit(new MyRunnable("任务7"));   //核心线程已满,等待队列已满,进入临时线程
executorService.submit(new MyRunnable("任务8"));
executorService.submit(new MyRunnable("任务9"));   //核心线程已满,等待队列已满,临时线程已满,抛出异常executorService.shutdown();

自定义线程工厂与拒绝策略

自定义线程工厂
class MyThreadFactory implements ThreadFactory {//线程池名称前缀String namePrefix;public MyThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}//具备原子性的Integer类型private AtomicInteger threadNumber = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,namePrefix+"线程"+threadNumber.getAndIncrement());}
}

主要实现ThreadFactory接口中的newThread方法,我写的这个方法也就是对线程池与线程池中的线程进行了重命名

自定义拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("哎呦,达到能执行任务的上限了,(宝ᴗ宝)做不到啊");}
}

在这里呢我自定义了一个拒绝策略,当有一个线程进入到线程池后,核心线程,线程队列,临时线程等都没有空闲位置时,会直接输出一段话,线程不会进入线程池之中。

线程池的状态

线程池的状态分为:RUNNING , SHUTDOWN , STOP , TIDYING , TERMINATED

RUNNING:运行状态,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。该状态的线程池会接收新任务,并处理工作队列中的任务。

  •         调用线程池的shutdown()方法,可以切换到SHUTDOWN关闭状态;
  •         调用线程池的shutdownNow()方法,可以切换到STOP停止状态;

SHUTDOWN :关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;当工作队列为空时,并且线程池中执行的任务也为空时,线程池进入TIDYING状态;

STOP:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行 的任务;线程池中执行的任务为空,进入TIDYING状态;

TIDYING :整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0terminated()执行完毕,进入TERMINATED状态;

TERMINATED : 终止状态,该状态表示线程池彻底关闭。

线程池的优点

  1. 提高响应的速度
  2. 提高线程的可管理性
  3. 降低资源消耗(线程执行完任务,不销毁,可以执行其他的任务)

线程池核心代码

线程池相关的接口和实现类

线程池技术,是由2个核心接口和一组实现类组成。

Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。

ExecutorService接口继承了Executor接口,扩展了awaitTermination()submit()shutdown()等专门用于管理线程任务的方法。

ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。

AbstractExecutorService有两个常见的子类ThreadPoolExecutorForkJoinPool,用于实现不同的线程池。

ThreadPoolExecutor线程池通过Woker工作线程BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;

ForkJoinPool是一个基于分治思想的线程池实现类,通过分叉(fork)合并(join)的方式,将一个大任务拆分成多个小任务,并且为每个工作线程提供一个工作队列,减少竞争,实现并行的线程任务执行方式,所以ForkJoinPool适合计算密集型场景,是ThreadPoolExecutor线程池的一种补充。

ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,按照时间周期执行线程任务的线程池实现类,通常用于作业调度相关的业务场景。由于该线程池的工作队列使用DelayedWorkQueue,这是一个按照任务执行时间进行排序的优先级工作队列,所以这也是ScheduledThreadPoolExecutor线程池能按照时间周期来执行线程任务的主要原因。

工作线程Worker类

每个Woker类的对象,都代表线程池中的一个工作线程。

ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet<Worker> workders工作线程集合,统一由线程池进行管理。

通过阅读源代码,可以看出Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务Thread线程对象

当创建Worker工作线程时,会通过构造方法保存Runnable线程任务,同时使用ThreadFactory线程工厂,为该工作线程创建一个Thread线程对象。通过这样的操作,每个Worker工作线程对象,都将绑定一个真正的Thread线程。

另外,当Thread线程被JVM调度执行时,线程将会自动执行Worker工作线程对象的run()方法,通过调用runWorker()方法,最终实现Woker工作线程中所保存的Runnable线程任务的执行。

值得重视的是:当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用。

核心方法:execute()方法

ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。

完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:

  1. 首先,通过AtomicInteger类型的ctl对象,获取线程池的状态工作线程数

  2. 然后,判断当前线程池中的工作线程数

  3. 如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;

  4. 如果,工作线程的数量大于核心线程数,并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;

  5. 如果,workQueue工作队列缓存线程任务失败,代表工作队列已满。那么,线程池会重新通过addWorker()方法,尝试创建新的工作线程;

  6. 这次创建时,会判断工作线程数是否超出最大线程数。如果没有超出,会创建新的工作线程;如果已经超出,则返回false,代表创建失败;

  7. 如果创建失败,线程池执行拒绝策略

public class ThreadPoolExecutor 
{// 线程池执行Runnable线程任务public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池的状态和工作线程数int c = ctl.get();// 工作线程的数量小于核心线程数if (workerCountOf(c) < corePoolSize) {// 创建新的Worker工作线程if (addWorker(command, true))return;// 创建失败,重新获取线程池的状态和工作线程数c = ctl.get();}// 如果线程池处于RUNNING状态,缓存线程任务至工作队列if (isRunning(c) && workQueue.offer(command)) {// 任务缓存成功// 重新获取线程池的状态和工作线程数int recheck = ctl.get();// 如果线程池不是处于RUNNING状态,则删除任务if (! isRunning(recheck) && remove(command))// 执行拒绝策略reject(command);// 如果工作线程数等于零// 通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果缓存线程任务至工作队列// 尝试创建新的工作线程// 创建时,判断工作线程数是否超出最大线程数// 如果没有超出,创建成功// 如果已经超出,创建失败else if (!addWorker(command, false))// 执行拒绝策略reject(command);}
}

核心方法:addWorker()方法

execute()方法的执行过程中,会通过addWorker()方法创建一个工作线程,用于执行当前线程任务。

阅读源代码,会发现,这个方法的整个执行过程可以分为两个部分:检查线程池的状态和工作线程数量创建并执行工作线程

1  检查线程池的状态和工作线程数量

private boolean addWorker(Runnable firstTask, boolean core) {// 第1部分:检查线程池的状态和工作线程数量// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {// 通过ctl对象,获取当前线程池的运行状态int c = ctl.get();int rs = runStateOf(c);// 如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)// 则工作线程创建失败if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 检查工作线程数量for (;;) {// 通过ctl对象,获取当前线程池中工作线程数量int wc = workerCountOf(c);// 工作线程数量如果超出最大容量或者核心线程数(最大线程数)// 则工作线程创建失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;// 再次获取线程池状态,检查是否发生变化c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 第2部分:创建并执行工作线程....
}

2  创建并执行工作线程

private boolean addWorker(Runnable firstTask, boolean core) {// 第1部分:检查线程池的状态和工作线程数量....// 第2部分:创建并执行工作线程....boolean workerStarted = false; // 工作线程是否已经启动boolean workerAdded = false;   // 工作线程是否已经保存Worker w = null;try {// 创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);// 获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread;if (t != null) {// 获取线程池的ReentrantLock主锁对象// 确保在添加和启动线程时的同步与安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查线程池状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查Thread线程对象的状态是否已经处于启动状态if (t.isAlive()) throw new IllegalThreadStateException();// 保存工作线程workers.add(w);// 记录线程池曾经达到过的最大工作线程数量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 添加工作线程后,正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}// 返回线程启动状态return workerStarted;
}

http://www.lryc.cn/news/612201.html

相关文章:

  • 猫头虎AI分享:Claude Opus 新版 4.1 在 SWE-bench Verified 上准确率达到了 74.5%,在多文件代码重构方面表现突出
  • [AI 生成] 大数据数仓面试题
  • AI巨模型对决2025:五强争霸,谁能称王?
  • C++音视频流媒体开发面试题:音视频基础
  • 企业知识库:RAG技术实现流程总览(一)
  • 控制服务和守护进程-systemctl
  • C语言route命令详解:网络路由管理的核心工具
  • MaxKB 使用 MCP 连接 Oracle (免安装 cx_Oracle 和 Oracle Instant Client)
  • 搭建SAP S/4HANA虚拟机的安装与配置指南
  • 基于最大似然估计的卡尔曼滤波与自适应模糊PID控制的单片机实现
  • jdk动态代理如何实现
  • 力扣经典算法篇-45-回文数(数字处理:求余+整除,字符串处理:左右指针)
  • Unity笔记(二)——Time、Vector3、位置位移、角度、旋转、缩放、看向
  • 【历史人物】【范仲淹】简历与生平
  • 看不见的伪造痕迹:AI时代的鉴伪攻防战
  • NAT转化
  • 後端開發技術教學(二) 條件指令、循環結構、定義函數
  • 在 Visual Studio Code 中免费使用 Gemini 2.5 Pro API
  • 力扣面试150(48/150)
  • cacti
  • qt6 cmake vscode加载qrc图片资源
  • Milvus 向量数据库内存使用相关了解
  • 《第十篇》深入解析 `MilvusKBService`:基于 Milvus 的知识库服务实现
  • Vscode 解决 git插件Failed to connect to github.com port 443 connection timed out
  • FastAPI(未结束)
  • 实名认证 —— 腾讯云驾驶证识别接口
  • Spring_事务
  • docker相关操作记录
  • C语言控制语句练习题1
  • 记一次ORACLE ORA-00600 [19004] 错误的分析与解决方法