当前位置: 首页 > article >正文

JUC入门(四)

ReadWriteLock

代码示例:

package com.yw.rw;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteDemo {public static void main(String[] args) {MyCache myCache = new MyCache();//只做写入操作for (int i = 1;i<=5;i++){final int temp = i;new Thread(()->{myCache.put(temp+"",temp+"");},String.valueOf(i)).start();}//只做读取操作for (int i = 1;i<=5;i++){final int temp = i;new Thread(()->{myCache.get(temp+"");},String.valueOf(i)).start();}}
}//自定义缓存
class MyCache{private volatile Map<String,Object> map  = new HashMap<>();//读写锁:更加细粒度的操作ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();//存,写public void put(String key,Object value){reentrantReadWriteLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName() + "写入" + key);map.put(key, value);System.out.println(Thread.currentThread().getName() + "写入OK" );} catch (Exception e) {throw new RuntimeException(e);} finally {reentrantReadWriteLock.writeLock().unlock();}}//取,读public void get(String key){reentrantReadWriteLock.readLock().lock();try {System.out.println(Thread.currentThread().getName() + "读取" + key);Object o = map.get(key);System.out.println(Thread.currentThread().getName() + "读取OK" );} catch (Exception e) {throw new RuntimeException(e);} finally {reentrantReadWriteLock.readLock().unlock();}}
}

原理

核心概念

ReentrantReadWriteLock 的核心是区分读锁和写锁:

  • 读锁(ReadLock:允许多个读线程同时访问共享资源。
  • 写锁(WriteLock:写线程访问共享资源时,会独占资源,不允许其他读线程或写线程访问。

这种锁的设计基于以下原则:

  • 读读共享:多个读线程可以同时读取共享资源。
  • 读写互斥:读线程和写线程不能同时访问共享资源。
  • 写写互斥:写线程之间也不能同时访问共享资源。
主要方法

ReentrantReadWriteLock 提供了读锁和写锁的接口,分别通过 readLock()writeLock() 获取。

  • ReentrantReadWriteLock():构造方法,创建一个默认的读写锁。
  • ReentrantReadWriteLock(boolean fair):构造方法,指定是否为公平锁。如果为 true,则按照线程请求锁的顺序分配锁。
  • Lock readLock():获取读锁。
  • Lock writeLock():获取写锁。

读锁和写锁都实现了 Lock 接口,因此可以使用 Lock 接口提供的方法,例如:

  • void lock():获取锁。
  • void unlock():释放锁。
  • boolean tryLock():尝试获取锁,如果当前没有锁,则立即返回 true;否则返回 false
  • boolean tryLock(long timeout, TimeUnit unit):尝试获取锁,直到超时。
底层实现

ReentrantReadWriteLock 的底层实现也是基于 AQS(AbstractQueuedSynchronizer)。AQS 提供了一个共享锁的框架,ReentrantReadWriteLock 利用这个框架实现了读写锁的机制。

  • 状态表示:AQS 的状态(state)被用来表示读锁和写锁的持有情况。状态的高 16 位表示读锁的持有数量,低 16 位表示写锁的持有数量。
  • 锁的获取与释放
  1. 读锁:当一个线程尝试获取读锁时,会检查写锁是否被其他线程持有。如果没有写锁被持有,或者写锁被当前线程持有(可重入),则允许读锁的获取。
  2. 写锁:当一个线程尝试获取写锁时,会检查是否有其他线程持有读锁或写锁。如果没有其他线程持有锁,或者写锁已经被当前线程持有(可重入),则允许写锁的获取。
工作流程
  1. 初始化:创建 ReentrantReadWriteLock 对象。
  2. 获取读锁:读线程调用 readLock().lock(),尝试获取读锁。
  3. 获取写锁:写线程调用 writeLock().lock(),尝试获取写锁。
  4. 释放锁:线程完成操作后,调用 unlock() 方法释放锁。

阻塞队列

 可见它与list,set处于同一层级

由此可见

BlockingQueue 不是新的东西

什么时候我们会用到阻塞队列?多线程并发处理,线程池!

BlockingQueue的四组常用API

方式抛出异常不会抛出异常,有返回值阻塞 等待超时等待
添加addofferputoffer(..)
移除removepolltakepoll(..)
判断队列首elementpeek

抛出异常

package com.yw.bq;import java.util.concurrent.ArrayBlockingQueue;public class Demo1 {public static void main(String[] args) {//这里的参数3表示队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));System.out.println(blockingQueue.add("d"));System.out.println("=======================");//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());}
}

我们的队列大小为3,此时如果再添加一个元素,就会抛出异常

如果队列为空,如果继续抛出,依旧会抛出异常

package com.yw.bq;import java.util.concurrent.ArrayBlockingQueue;public class Demo1 {public static void main(String[] args) {//这里的参数3表示队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.add("d"));System.out.println("=======================");System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());}
}

 有返回值,不抛出异常

package com.yw.bq;import java.util.concurrent.ArrayBlockingQueue;public class Demo1 {public static void main(String[] args) {//这里的参数3表示队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("d"));System.out.println("=======================");System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());}
}

 等待,阻塞

package com.yw.bq;import java.util.concurrent.ArrayBlockingQueue;public class Demo1 {public static void main(String[] args) throws InterruptedException {//这里的参数3表示队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");
//        blockingQueue.put("d");System.out.println("=======================");System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());}
}

超时等待 

package com.yw.bq;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Demo1 {public static void main(String[] args) throws InterruptedException {//这里的参数3表示队列的大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.offer("a");blockingQueue.offer("b");blockingQueue.offer("c");
//设置超时时间和单位blockingQueue.offer("d", 2,TimeUnit.SECONDS);System.out.println("=======================");blockingQueue.poll();blockingQueue.poll();blockingQueue.poll();
//设置超时时间和单位blockingQueue.poll(2,TimeUnit.SECONDS);}
}

原理

BlockingQueue 是 Java 并发包(java.util.concurrent)中提供的一种线程安全的队列接口,用于在多线程环境中协调线程之间的通信和同步。它支持在队列为空时阻塞插入操作,或者在队列满时阻塞移除操作,从而简化了线程间的数据共享和通信。
核心概念

BlockingQueue 的核心是一个线程安全的队列,支持以下两种主要操作:

  • 插入操作:将元素放入队列。如果队列已满,插入操作会被阻塞,直到有空间可用。
  • 移除操作:从队列中取出元素。如果队列为空,移除操作会被阻塞,直到有元素可用。

此外,BlockingQueue 还提供了非阻塞操作(如 offer()poll())和带超时的阻塞操作(如 offer(E e, long timeout, TimeUnit unit)poll(long timeout, TimeUnit unit))。

线程安全机制

BlockingQueue 的实现类通过以下方式保证线程安全:

  • 锁机制:使用内部锁(如 ReentrantLock)来保护队列的插入和移除操作。
  • 条件变量:使用条件变量(如 Condition)来实现阻塞和唤醒机制。例如,当队列为空时,移除操作会等待“非空”条件;当队列满时,插入操作会等待“非满”条件。

阻塞机制

BlockingQueue 的阻塞机制基于条件变量和锁:

插入阻塞
  • 当队列满时,调用 put() 方法的线程会被阻塞。
  • 线程会被放入“非满”条件的等待队列中,直到有空间可用。
  • 当有线程调用 take()poll() 从队列中移除元素后,会唤醒“非满”条件上的一个或多个线程。
 移除阻塞
  • 当队列为空时,调用 take() 方法的线程会被阻塞。
  • 线程会被放入“非空”条件的等待队列中,直到有元素可用。
  • 当有线程调用 put() 向队列中插入元素后,会唤醒“非空”条件上的一个或多个线程。

工作流程

初始化
  • 创建 BlockingQueue 实例时,可以指定容量(对于有界队列)。
  • 初始化锁和条件变量。
插入操作
  • 生产者线程调用 put() 方法。
  • 如果队列未满,插入元素并唤醒等待在 notEmpty 条件上的线程。
  • 如果队列已满,当前线程被阻塞,直到有空间可用。
移除操作
  • 消费者线程调用 take() 方法。
  • 如果队列非空,移除元素并唤醒等待在 notFull 条件上的线程。
  • 如果队列为空,当前线程被阻塞,直到有元素可用。
关键点
锁分离

LinkedBlockingQueue 使用两个锁(putLocktakeLock)分别保护插入和移除操作,减少了锁竞争,提高了并发性能。

条件变量

使用条件变量实现阻塞和唤醒机制,确保线程在合适的时间被唤醒。

线程安全

所有操作都是线程安全的,无需额外的同步措施。

阻塞与超时

提供了阻塞操作(如 put()take())和带超时的阻塞操作(如 offer(E e, long timeout, TimeUnit unit)poll(long timeout, TimeUnit unit)),方便在不同场景下使用。

总结

BlockingQueue 的原理基于锁和条件变量,通过阻塞和唤醒机制实现线程间的同步。它的实现类(如 LinkedBlockingQueue)通过分离锁和条件变量,减少了锁竞争,提高了并发性能。这种设计使得 BlockingQueue 非常适合用于生产者-消费者模型和其他需要线程间同步的场景。

SynchronousQueue 同步队列

没用容量

进去一个元素,必须等取出来之后,才能继续放下一个元素

代码演示:

package com.yw.bq;import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;public class SynchronousQueueDemo {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();new Thread(()->{try {System.out.println(Thread.currentThread().getName() + "put 1");queue.put("1");TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "put 2");queue.put("2");TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "put 3");queue.put("3");} catch (InterruptedException e) {throw new RuntimeException(e);}},"T1").start();new Thread(()->{try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "get " + queue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "get " + queue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "get " + queue.take());TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}},"T2").start();}
}

原理

SynchronousQueue 是 Java 并发包(java.util.concurrent)中提供的一种特殊的阻塞队列,它的独特之处在于它不存储元素。也就是说,它不能像其他阻塞队列(如 ArrayBlockingQueueLinkedBlockingQueue)那样在内部缓存数据。相反,SynchronousQueue 的作用更像是一个“直接传递”的通道,生产者线程必须等待消费者线程准备好接收数据,反之亦然。

核心概念

SynchronousQueue 的核心在于:

  • 直接传递:生产者线程将数据传递给消费者线程时,必须直接将数据交给消费者,而不能将数据存储在队列中。
  • 一对一匹配:每个插入操作(put)必须等待一个移除操作(take),反之亦然。生产者和消费者必须“同步”操作。
主要方法

SynchronousQueue 实现了 BlockingQueue 接口,因此提供了以下常用方法:

  • void put(E e):将一个元素放入队列。如果当前没有消费者线程等待接收数据,则阻塞。
  • E take():从队列中移除并返回一个元素。如果当前没有生产者线程等待传递数据,则阻塞。
  • boolean offer(E e):尝试将一个元素放入队列,如果当前有消费者线程等待,则成功传递并返回 true;否则返回 false
  • E poll():尝试从队列中移除并返回一个元素,如果当前有生产者线程等待,则成功接收并返回数据;否则返回 null
底层实现

 SynchronousQueue 的底层实现基于 AQS(AbstractQueuedSynchronizer),它通过共享锁的方式实现线程间的同步。

  • 锁机制SynchronousQueue 使用一个共享锁来协调生产者和消费者线程。
  • 队列结构:虽然它不存储元素,但它维护了两个队列:
  • 匹配机制
  1. 当一个生产者线程调用 put() 方法时,它会被放入生产者队列,并等待消费者线程调用 take()

  2. 当一个消费者线程调用 take() 方法时,它会被放入消费者队列,并等待生产者线程调用 put()

  3. 一旦生产者和消费者匹配成功,数据就会直接从生产者传递给消费者,然后双方线程都会被唤醒并继续执行。

工作流程
1、生产者线程调用 put()
  • 如果当前有消费者线程在等待(调用了 take()),则直接将数据传递给消费者,双方线程都被唤醒。
  • 如果没有消费者线程等待,则生产者线程被阻塞,放入生产者队列。

2、消费者线程调用 take()

  • 如果当前有生产者线程在等待(调用了 put()),则直接从生产者接收数据,双方线程都被唤醒。
  • 如果没有生产者线程等待,则消费者线程被阻塞,放入消费者队列。

3、匹配成功

  • 当生产者和消费者线程匹配成功后,数据直接从生产者传递给消费者,双方线程继续执行。
总结

SynchronousQueue 是一种特殊的阻塞队列,它通过直接传递数据的方式实现生产者和消费者之间的同步。它的设计目标是减少数据在队列中的停留时间,提高数据传递的效率。虽然它不适合需要缓存数据的场景,但在直接传递数据或对延迟要求较高的场景中非常有用。

http://www.lryc.cn/news/2379846.html

相关文章:

  • 【HarmonyOS 5】鸿蒙mPaaS详解
  • 多线BGP服务器优化实践与自动化运维方案
  • 无法加载文件 E:\Program Files\nodejs\npm.ps1,因为在此系统上禁止运行脚本
  • 【C++模板与泛型编程】实例化
  • TB开拓者策略交易信号闪烁根因及解决方法
  • 什么是RDMA?
  • C++面试3——const关键字的核心概念、典型场景和易错陷阱
  • ASIC和FPGA,到底应该选择哪个?
  • 【C++】嵌套类访问外部类成员
  • mac下载、使用mysql
  • java Lombok 对象模版和日志注解
  • Python学习笔记--使用Django操作mysql
  • win11下,启动springboot时,提示端口被占用的处理方式
  • 计算机视觉设计开发工程师学习路线
  • AI大模型从0到1记录学习numpy pandas day25
  • Opencv C++写中文(来自Gemini)
  • 下载和导出文件名称乱码问题
  • STM32实战指南:DHT11温湿度传感器驱动开发与避坑指南
  • 【android bluetooth 协议分析 01】【HCI 层介绍 8】【ReadLocalVersionInformation命令介绍】
  • esp32课设记录(四)摩斯密码的实现 并用mqtt上传
  • 「HHT(希尔伯特黄变换)——ECG信号处理-第十三课」2025年5月19日
  • 前端(vue)学习笔记(CLASS 6):路由进阶
  • GPT-4.1特点?如何使用GPT-4.1模型,GPT-4.1编码和图像理解能力实例展示
  • 使用Python和FastAPI构建网站爬虫:Oncolo医疗文章抓取实战
  • 写一段图片平移的脚本
  • 【C++】哈希的概念与实现
  • Yocto和Buildroot功能和区别
  • 物联网数据湖架构
  • 详解RabbitMQ工作模式之发布订阅模式
  • 什么是子网委派?