当前位置: 首页 > news >正文

JavaEE初阶第十期:解锁多线程,从 “单车道” 到 “高速公路” 的编程升级(八)

专栏:JavaEE初阶起飞计划

个人主页:手握风云

目录

一、多线程案例

1.1. 阻塞队列


一、多线程案例

1.1. 阻塞队列

  • 概念

        阻塞队列是一种特殊的队列,也遵守“先进先出”的规则。在Java标准库大部分的类是线程不安全的,阻塞队列是一种线程安全的数据结构。队列满时:插入操作会阻塞,直到队列有空余空间。队列空时:取出操作会阻塞,直到队列有新元素。

  • 生产者消费者模型

        生产者消费者模型是一种通过阻塞队列来解决生产者和消费者之间强耦合问题的开发模型。在该模型中,生产者和消费者不直接进行通讯,而是通过阻塞队列作为中间容器传递数据:生产者生产完数据后,直接将数据放入阻塞队列,无需等待消费者处理;消费者则从阻塞队列中获取数据进行处理,无需主动向生产者索取数据。

        我们拿包饺子来说,基本可以简化为擀饺子皮和包饺子两个过程。一种包法:每个人都进行这两个步骤一起包,相比于一个人包,效率就会快很多,但多个人同时针对擀面杖进行竞争,就会造成阻塞。另一种包法:一个人擀饺子皮,剩下的人包饺子。那么此时针对饺子皮这个资源,擀饺子皮的人就是生产者,包饺子的人就是消费者。针对生产者和消费者的定位,具体还要看资源。

        阻塞队列用于协调多个线程之间的工作。如果擀饺子皮的人速度较慢,那么包饺子的人就需要阻塞等待;如果擀饺子皮的人速度较快,盖帘上已经没有位置,也需要阻塞等待。

        引入生产者消费者模型的目的是为了减少锁竞争,生产者和消费者的步调不一定完全一致,此时阻塞队列就可以起到协调的效果。

        优点:1.平衡处理能力(削峰填谷);2.解耦生产者与消费者;3.支持并发协作。

        缺点:1.增加系统复杂性;2.资源开销;3.潜在的性能瓶颈。

  • 标准库中的阻塞队列

        BlockingQueue是一个接口,真正实现的类是LinkedBlockQueue和ArrayBlockQueue。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现BlockingQueue<String> queue2 = new ArrayBlockingQueue<>(10); // 基于顺序表实现}
}

        put 方法用于阻塞式的入队列,take 用于阻塞式的出队列。BlockingQueue 也有 offer, poll, peek 等方法,但是这些方法不带有阻塞特性。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}

        由于也会触发阻塞异常,也需要抛出。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");queue1.take();}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");queue1.put("abc");System.out.println("添加了一个元素");}
}

        当放入了3个元素之后,就会阻塞等待。如果阻塞队列为空,出元素也会阻塞。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(3);// 基于链表实现queue1.take();}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;/*** @author gao* @date 2025/7/20 21:05*/public class Demo2 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);Thread producer = new Thread(() -> {int count = 0;try {while (true) {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});Thread consumer = new Thread(() -> {try {while (true) {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();producer.join();consumer.join();}
}

        可以看到生产一个元素就会消费一个元素,如果我们删除掉consumer线程里的休眠时间,运行结果没什么区别。但如果删除掉producer线程里面的休眠时间,会发现只消费了一个元素,就会产生阻塞,当生产了1000个元素后,才会消费。

  • 阻塞队列的实现
// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;public MyBlockingQueue(int capacity) {array = new String[capacity];}public void put(String elem) {}public String take() {}
}

        我们先不考虑阻塞,当向队列里面添加元素时,tail就向后移动,到达队尾时,就重新回到起始位置。同理,获取队列元素时,就让head向后移动。


/**** @param elem 入队列*/
public void put(String elem) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;
}/**** @return 获取队列元素*/
public String take() {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;
}

        接下来需要考虑线程安全:两个线程操作同一个队列时,不会出现bug。我们观察上面的代码,两个线程都是针对同一个变量(size和array中的同一下标)进行修改,修改操作不是原子的,整体代码都进行加锁是比较安全的。并且还需要注意,必须使用同一个锁对象。

private Object locker1 = new Object();
public MyBlockingQueue(int capacity) {array = new String[capacity];
}/**** @param elem 入队列*/
public void put(String elem) {synchronized (locker1) {if (size >= array.length) {return;}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;}
}/**** @return 获取队列元素*/
public String take() {synchronized (locker1) {if (size == 0) {// 队列为空,先不考虑阻塞return null;}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;return ret;}
}

        接下来需要考虑阻塞:当队列满的时候,主动进入阻塞,并且还需要放进锁里面,当有元素出队列时,就需要唤醒;同理,当队列为空时,阻塞,有元素进队列时,唤醒。

/**** @param elem 入队列*/
public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}
}/**** @return 获取队列元素*/
public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}
}

        完整代码实现:

// 为了简单,这里不加泛型参数
// 假定初始元素类型为String
class MyBlockingQueue {// 利用数组实现private String[] array = null;private int head = 0;private int tail = 0;private int size = 0;private Object locker1 = new Object();public MyBlockingQueue(int capacity) {array = new String[capacity];}/**** @param elem 入队列*/public void put(String elem) throws InterruptedException {synchronized (locker1) {if (size >= array.length) {locker1.wait();}array[tail] = elem;tail++;if (tail >= array.length) {tail = 0;}size++;locker1.notify();}}/**** @return 获取队列元素*/public String take() throws InterruptedException {synchronized (locker1) {if (size == 0) {locker1.wait();}String ret = array[head];head++;if (head >= array.length) {head = 0;}size--;// 唤醒put方法里的阻塞locker1.notify();return ret;}}
}public class Demo3 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread producer = new Thread(() -> {int count = 0;while (true) {try {queue.put("" + count);System.out.println("生产了一个元素:" + count);count++;Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});Thread consumer = new Thread(() -> {while (true) {try {String elem = queue.take();System.out.println("消费了一个元素:" + elem);Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();consumer.start();}
}

        上面是wait()方法的源码,官方文档建议使用while()循环来判断条件。正常来说,必须等待条件被打破了,才能被唤醒,如果是其他代码,不排除唤醒之后还会出现,条件仍然成立的可能性,那么后面的代码执行就会出现问题。while相当于“二次确认”来保证条件确实不成立。

http://www.lryc.cn/news/594573.html

相关文章:

  • 经典神经网络(vgg resnet googlenet)
  • LiteCoT:难度感知的推理链压缩与高效蒸馏框架
  • Apache IoTDB(2):时序数据库 IoTDB 集群安装部署的技术优势与适用场景分析
  • 卫朋:华为流程体系拆解系列之高阶流程L1-L3分解三阶七步法
  • 深入详解随机森林在放射治疗计划优化中的应用及实现细节
  • 【Elasticsearch】BM25的discount_overlaps参数
  • Qt中的网络通信
  • Lua:小巧而强大的脚本语言,游戏与嵌入式的秘密武器
  • 搭建前端页面,介绍对应标签
  • wordle game(猜词游戏)小demo【react + ts】
  • 搭建种草商城框架指南
  • Protein FID:AI蛋白质结构生成模型评估新指标
  • MCP协议解析:如何通过Model Context Protocol 实现高效的AI客户端与服务端交互
  • 基础神经网络模型搭建
  • 【Linux】3. Shell语言
  • 双8无碳小车“cad【17张】三维图+设计说名书
  • XTTS实现语音克隆:精确控制音频格式与生成流程【TTS的实战指南】
  • XSS GAME靶场
  • 仙盟数据库应用-外贸标签打印系统 前端数据库-V8--毕业论文-—-—仙盟创梦IDE
  • Apache基础配置
  • ESMFold 安装教程
  • 深度相机的工作模式(以奥比中光深度相机为例)
  • 近期工作感想:职业规划篇
  • 【RAG Agent】Deep Searcher实现逻辑解析
  • 尚庭公寓--------登陆流程介绍以及功能代码
  • Linux:线程控制
  • API获取及调用(以豆包为例实现图像分析)
  • 《计算机网络》实验报告三 UDP协议分析
  • 单线程 Reactor 模式
  • 【PyTorch】图像二分类项目