【1.7 漫画Java核心并发编程】
1.7 漫画Java核心并发编程
🎭 人物介绍
- 小明:对Java并发编程感兴趣的开发者
- 架构师老王:Java并发编程专家,精通各种并发工具
📚 Java并发编程基础
小明:“老王,Java并发编程为什么这么复杂?”
架构师老王:“因为并发编程需要处理多个线程同时访问共享资源的问题!主要挑战包括:线程安全、死锁、性能优化等。但掌握了核心原理,就能写出高效的并发程序。”
并发编程核心概念
Java并发编程体系|+-----------------+-----------------+| | |线程基础 同步机制 并发工具类| | |Thread/Runnable synchronized 线程池/Lock线程状态机 volatile CountDownLatchThreadLocal AQS原理 ConcurrentHashMap
🧵 Java多线程基础
线程创建和管理
/*** 线程创建的多种方式*/
public class ThreadCreationDemo {// 方式1:继承Thread类static class MyThread extends Thread {private String threadName;public MyThread(String name) {this.threadName = name;}@Overridepublic void run() {for (int i = 0; i < 5; i++) {System.out.println(threadName + " - Count: " + i);try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}}// 方式2:实现Runnable接口static class MyRunnable implements Runnable {private String taskName;public MyRunnable(String name) {this.taskName = name;}@Overridepublic void run() {System.out.println("Task " + taskName + " is running on " + Thread.currentThread().getName());}}// 方式3:使用Callable和Futurestatic class MyCallable implements Callable<String> {private String taskName;public MyCallable(String name) {this.taskName = name;}@Overridepublic String call() throws Exception {Thread.sleep(2000);return "Task " + taskName + " completed";}}public static void main(String[] args) throws Exception {// 创建和启动ThreadMyThread thread1 = new MyThread("Thread-1");thread1.start();// 使用RunnableThread thread2 = new Thread(new MyRunnable("Task-1"));thread2.start();// 使用CallableExecutorService executor = Executors.newSingleThreadExecutor();Future<String> future = executor.submit(new MyCallable("Callable-Task"));System.out.println("Result: " + future.get());executor.shutdown();}
}
线程状态机
/*** 线程状态演示*/
public class ThreadStateDemo {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(() -> {try {// RUNNABLE状态System.out.println("Thread is running...");// TIMED_WAITING状态Thread.sleep(2000);// BLOCKED状态 - 等待获取锁synchronized (ThreadStateDemo.class) {System.out.println("Thread got the lock");Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});System.out.println("1. Initial state: " + thread.getState()); // NEWthread.start();System.out.println("2. After start: " + thread.getState()); // RUNNABLEThread.sleep(500);System.out.println("3. During sleep: " + thread.getState()); // TIMED_WAITINGthread.join();System.out.println("4. After completion: " + thread.getState()); // TERMINATED}
}
🔒 synchronized同步机制
synchronized的使用方式
/*** synchronized的多种使用方式*/
public class SynchronizedDemo {private int count = 0;private static int staticCount = 0;private final Object lock = new Object();// 1. 同步实例方法public synchronized void incrementInstance() {count++;System.out.println("Instance count: " + count);}// 2. 同步静态方法public static synchronized void incrementStatic() {staticCount++;System.out.println("Static count: " + staticCount);}// 3. 同步代码块 - 对象锁public void incrementWithBlock() {synchronized (this) {count++;System.out.println("Block count: " + count);}}// 4. 同步代码块 - 自定义锁public void incrementWithCustomLock() {synchronized (lock) {count++;System.out.println("Custom lock count: " + count);}}// 5. 同步代码块 - 类锁public void incrementWithClassLock() {synchronized (SynchronizedDemo.class) {staticCount++;System.out.println("Class lock count: " + staticCount);}}
}
synchronized底层原理
/*** synchronized底层实现原理演示*/
public class SynchronizedPrinciple {private final Object monitor = new Object();// javap -v 查看字节码会看到:// monitorenter 和 monitorexit 指令public void synchronizedMethod() {synchronized (monitor) {// 临界区代码System.out.println("Critical section");}}// 对象头中的Mark Word存储锁信息public void explainObjectHeader() {/** 对象头结构(64位JVM):* |-----------------------------------------------------------------------|* | Object Header (128 bits) |* |-----------------------------------------------------------------------|* | Mark Word (64 bits) | Klass Word (64 bits) |* |-----------------------------------------------------------------------|* * Mark Word在不同锁状态下的格式:* |-------------------------------------------------------|--------------------|* | Mark Word (64 bits) | State |* |-------------------------------------------------------|--------------------|* | identity_hashcode:25| age:4| biased_lock:0| 01 | Normal |* |-------------------------------------------------------|--------------------|* | thread:54 | epoch:2 | age:4| biased_lock:1| 01 | Biased |* |-------------------------------------------------------|--------------------|* | ptr_to_lock_record:62 | 00 | Lightweight Locked |* |-------------------------------------------------------|--------------------|* | ptr_to_heavyweight_monitor:62 | 10 | Heavyweight Locked |* |-------------------------------------------------------|--------------------|* | | 11 | Marked for GC |* |-------------------------------------------------------|--------------------|*/}// 锁升级过程演示public void lockUpgradeDemo() {Object obj = new Object();// 1. 无锁状态 -> 偏向锁synchronized (obj) {System.out.println("First thread gets biased lock");}// 2. 偏向锁 -> 轻量级锁 (有竞争时)Thread thread1 = new Thread(() -> {synchronized (obj) {System.out.println("Thread1 gets lightweight lock");}});Thread thread2 = new Thread(() -> {synchronized (obj) {System.out.println("Thread2 gets lightweight lock");}});thread1.start();thread2.start();// 3. 轻量级锁 -> 重量级锁 (自旋失败时)}
}
⚡ volatile关键字详解
volatile的使用和原理
/*** volatile关键字演示*/
public class VolatileDemo {// volatile保证可见性private volatile boolean running = true;private volatile int count = 0;// 双重检查锁定模式中的volatileprivate volatile static VolatileDemo instance;public static VolatileDemo getInstance() {if (instance == null) {synchronized (VolatileDemo.class) {if (instance == null) {instance = new VolatileDemo();}}}return instance;}// volatile可见性演示public void visibilityDemo() {Thread writerThread = new Thread(() -> {try {Thread.sleep(1000);running = false; // 写操作System.out.println("Writer thread set running to false");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});Thread readerThread = new Thread(() -> {while (running) { // 读操作// 忙等待}System.out.println("Reader thread detected running is false");});readerThread.start();writerThread.start();}// volatile禁止指令重排序private volatile boolean initialized = false;private int data = 0;public void initializeData() {data = 42; // 1initialized = true; // 2 - volatile写操作// 由于volatile的内存屏障,1一定在2之前执行}public void useData() {if (initialized) { // volatile读操作System.out.println("Data: " + data); // 一定能看到data=42}}// volatile内存语义public void memorySemantics() {/** volatile写操作:* 1. 将当前处理器缓存行的数据写回到系统内存* 2. 这个写回内存的操作会使其他处理器里缓存了该内存地址的数据无效* * volatile读操作:* 1. 从系统内存中读取数据* 2. 禁止处理器重排序优化* * 内存屏障:* - StoreStore屏障:禁止普通写和volatile写重排序* - StoreLoad屏障:禁止volatile写与volatile读/写重排序* - LoadLoad屏障:禁止volatile读与普通读重排序* - LoadStore屏障:禁止volatile读与普通写重排序*/}
}
🏗️ AQS(AbstractQueuedSynchronizer)详解
小明:“AQS是什么?为什么这么重要?”
架构师老王:“AQS是Java并发包的基础!它提供了一个框架来构建各种同步器,比如ReentrantLock、CountDownLatch、Semaphore等都是基于AQS实现的。”
AQS核心原理
/*** 自定义同步器演示AQS原理*/
public class CustomMutex extends AbstractQueuedSynchronizer {// 独占模式:只有一个线程能获得锁@Overrideprotected boolean tryAcquire(int acquires) {// 使用CAS操作尝试获取锁if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int releases) {// 释放锁if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0); // volatile写操作,确保可见性return true;}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}// 提供Condition支持public Condition newCondition() {return new ConditionObject();}// 对外提供的APIpublic void lock() {acquire(1);}public boolean tryLock() {return tryAcquire(1);}public void unlock() {release(1);}public boolean isLocked() {return isHeldExclusively();}
}
AQS队列机制
/*** AQS队列原理演示*/
public class AQSQueueDemo {public void explainAQSQueue() {/** AQS内部维护一个FIFO双向队列:* * +------+ prev +-----+ +-----+* | head | <------ | | <---- | tail |* +------+ +-----+ +-----+* | |* thread thread* * 节点类型:* - CANCELLED(1):线程已取消* - SIGNAL(-1):后继节点需要被唤醒* - CONDITION(-2):线程在条件队列中等待* - PROPAGATE(-3):共享模式下需要传播*/}// 模拟AQS的acquire过程public void simulateAcquire() {/** acquire流程:* 1. tryAcquire()尝试获取锁* 2. 失败则addWaiter()加入队列* 3. acquireQueued()在队列中等待* 4. 被唤醒后再次尝试获取锁*/}// 共享模式示例static class CustomSemaphore extends AbstractQueuedSynchronizer {public CustomSemaphore(int permits) {setState(permits);}@Overrideprotected int tryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}}@Overrideprotected boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) { // overflowthrow new Error("Maximum permit count exceeded");}if (compareAndSetState(current, next)) {return true;}}}public void acquire(int permits) throws InterruptedException {acquireSharedInterruptibly(permits);}public void release(int permits) {releaseShared(permits);}}
}
🔐 ReentrantLock详解
ReentrantLock vs synchronized
/*** ReentrantLock使用演示*/
public class ReentrantLockDemo {private final ReentrantLock lock = new ReentrantLock();private final ReentrantLock fairLock = new ReentrantLock(true); // 公平锁private final Condition condition = lock.newCondition();private int count = 0;// 基本使用public void increment() {lock.lock();try {count++;} finally {lock.unlock(); // 必须在finally中释放锁}}// 尝试获取锁public boolean tryIncrement() {if (lock.tryLock()) {try {count++;return true;} finally {lock.unlock();}}return false;}// 带超时的锁获取public boolean incrementWithTimeout() throws InterruptedException {if (lock.tryLock(1, TimeUnit.SECONDS)) {try {count++;return true;} finally {lock.unlock();}}return false;}// 可中断的锁获取public void incrementInterruptibly() throws InterruptedException {lock.lockInterruptibly();try {count++;} finally {lock.unlock();}}// Condition使用private boolean ready = false;public void waitForReady() throws InterruptedException {lock.lock();try {while (!ready) {condition.await(); // 等待条件}System.out.println("Ready to proceed");} finally {lock.unlock();}}public void setReady() {lock.lock();try {ready = true;condition.signalAll(); // 唤醒所有等待的线程} finally {lock.unlock();}}// 可重入性演示public void demonstrateReentrant() {lock.lock();try {System.out.println("First lock acquired");nestedMethod(); // 重入} finally {lock.unlock();}}private void nestedMethod() {lock.lock(); // 重入同一个锁try {System.out.println("Nested lock acquired");} finally {lock.unlock();}}// 公平锁 vs 非公平锁public void fairnessDemo() {System.out.println("Lock is fair: " + lock.isFair());System.out.println("Has queued threads: " + lock.hasQueuedThreads());System.out.println("Queue length: " + lock.getQueueLength());}
}
ReentrantLock内部实现
/*** ReentrantLock内部结构分析*/
public class ReentrantLockInternals {public void explainImplementation() {/** ReentrantLock内部有两个同步器实现:* * 1. NonfairSync - 非公平同步器* 2. FairSync - 公平同步器* * 都继承自Sync,而Sync继承自AQS*/}// 非公平锁实现原理public void nonfairLockPrinciple() {/** 非公平锁的tryAcquire实现:* * protected final boolean tryAcquire(int acquires) {* return nonfairTryAcquire(acquires);* }* * final boolean nonfairTryAcquire(int acquires) {* final Thread current = Thread.currentThread();* int c = getState();* if (c == 0) {* // 直接尝试CAS,不检查队列* if (compareAndSetState(0, acquires)) {* setExclusiveOwnerThread(current);* return true;* }* }* else if (current == getExclusiveOwnerThread()) {* // 重入逻辑* int nextc = c + acquires;* if (nextc < 0) // overflow* throw new Error("Maximum lock count exceeded");* setState(nextc);* return true;* }* return false;* }*/}// 公平锁实现原理public void fairLockPrinciple() {/** 公平锁的tryAcquire实现:* * protected final boolean tryAcquire(int acquires) {* final Thread current = Thread.currentThread();* int c = getState();* if (c == 0) {* // 关键区别:检查是否有前驱节点* if (!hasQueuedPredecessors() &&* compareAndSetState(0, acquires)) {* setExclusiveOwnerThread(current);* return true;* }* }* else if (current == getExclusiveOwnerThread()) {* int nextc = c + acquires;* if (nextc < 0)* throw new Error("Maximum lock count exceeded");* setState(nextc);* return true;* }* return false;* }*/}
}
🏊 线程池详解
小明:“线程池是如何工作的?为什么要使用线程池?”
架构师老王:“线程池可以复用线程,避免频繁创建和销毁线程的开销。它的核心是ThreadPoolExecutor,通过队列、核心线程数、最大线程数等参数来控制线程的生命周期。”
ThreadPoolExecutor详解
/*** 线程池使用和配置演示*/
public class ThreadPoolDemo {// 自定义线程池private final ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(2, // corePoolSize4, // maximumPoolSize 60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<>(10), // workQueuenew CustomThreadFactory(), // threadFactorynew ThreadPoolExecutor.CallerRunsPolicy() // rejectedExecutionHandler);// 自定义线程工厂static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix = "CustomPool-thread-";@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());thread.setDaemon(false);thread.setPriority(Thread.NORM_PRIORITY);return thread;}}// 线程池执行流程演示public void demonstrateExecution() {// 1. 提交任务给核心线程for (int i = 0; i < 2; i++) {customExecutor.execute(new Task("Core-" + i));}// 2. 核心线程满了,任务进入队列for (int i = 0; i < 10; i++) {customExecutor.execute(new Task("Queue-" + i));}// 3. 队列满了,创建非核心线程for (int i = 0; i < 2; i++) {customExecutor.execute(new Task("NonCore-" + i));}// 4. 超出最大线程数,执行拒绝策略try {customExecutor.execute(new Task("Rejected"));} catch (Exception e) {System.out.println("Task rejected: " + e.getMessage());}// 监控线程池状态printPoolStatus();}static class Task implements Runnable {private final String name;public Task(String name) {this.name = name;}@Overridepublic void run() {System.out.println("Task " + name + " executing on " + Thread.currentThread().getName());try {Thread.sleep(2000); // 模拟任务执行} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}// 监控线程池状态public void printPoolStatus() {System.out.println("=== Thread Pool Status ===");System.out.println("Core pool size: " + customExecutor.getCorePoolSize());System.out.println("Maximum pool size: " + customExecutor.getMaximumPoolSize());System.out.println("Current pool size: " + customExecutor.getPoolSize());System.out.println("Active threads: " + customExecutor.getActiveCount());System.out.println("Completed tasks: " + customExecutor.getCompletedTaskCount());System.out.println("Total tasks: " + customExecutor.getTaskCount());System.out.println("Queue size: " + customExecutor.getQueue().size());}// 不同类型的线程池public void differentExecutors() {// 1. 固定大小线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);// 2. 缓存线程池ExecutorService cachedThreadPool = Executors.newCachedThreadPool();// 3. 单线程池ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();// 4. 定时任务线程池ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);// 定时任务示例scheduledExecutor.scheduleAtFixedRate(() -> System.out.println("Scheduled task executed"),1, // initialDelay3, // periodTimeUnit.SECONDS);// 延迟任务scheduledExecutor.schedule(() -> System.out.println("Delayed task executed"),5,TimeUnit.SECONDS);}// 线程池最佳实践public void bestPractices() {/** 1. 合理设置核心线程数和最大线程数* - CPU密集型:核心线程数 = CPU核数 + 1* - IO密集型:核心线程数 = CPU核数 * 2* * 2. 选择合适的队列* - ArrayBlockingQueue:有界队列,防止内存溢出* - LinkedBlockingQueue:无界队列,注意内存使用* - SynchronousQueue:直接传递,适合缓存线程池* * 3. 设置合理的拒绝策略* - AbortPolicy:直接抛异常(默认)* - CallerRunsPolicy:调用者执行* - DiscardPolicy:直接丢弃* - DiscardOldestPolicy:丢弃最老的任务* * 4. 监控线程池状态* 5. 优雅关闭线程池*/}// 优雅关闭线程池public void gracefulShutdown() {customExecutor.shutdown(); // 不再接受新任务try {// 等待已提交任务完成if (!customExecutor.awaitTermination(60, TimeUnit.SECONDS)) {customExecutor.shutdownNow(); // 强制关闭// 等待任务响应中断if (!customExecutor.awaitTermination(60, TimeUnit.SECONDS)) {System.err.println("Thread pool did not terminate");}}} catch (InterruptedException e) {customExecutor.shutdownNow();Thread.currentThread().interrupt();}}
}
ForkJoinPool工作窃取
/*** ForkJoinPool和工作窃取演示*/
public class ForkJoinDemo {private final ForkJoinPool forkJoinPool = new ForkJoinPool();// 分治任务示例:计算数组和static class SumTask extends RecursiveTask<Long> {private final int[] array;private final int start;private final int end;private static final int THRESHOLD = 1000;public SumTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 任务足够小,直接计算long sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else {// 任务太大,分割成子任务int mid = start + (end - start) / 2;SumTask leftTask = new SumTask(array, start, mid);SumTask rightTask = new SumTask(array, mid, end);// 异步执行左任务leftTask.fork();// 同步执行右任务Long rightResult = rightTask.compute();// 等待左任务完成Long leftResult = leftTask.join();return leftResult + rightResult;}}}public void demonstrateForkJoin() {int[] array = new int[10000];for (int i = 0; i < array.length; i++) {array[i] = i + 1;}SumTask task = new SumTask(array, 0, array.length);Long result = forkJoinPool.invoke(task);System.out.println("Sum: " + result);System.out.println("Parallelism: " + forkJoinPool.getParallelism());System.out.println("Active threads: " + forkJoinPool.getActiveThreadCount());}// 使用Stream的并行处理(底层使用ForkJoinPool)public void parallelStream() {List<Integer> numbers = IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList());// 并行计算long sum = numbers.parallelStream().mapToLong(Integer::longValue).sum();System.out.println("Parallel sum: " + sum);}
}
🤔 常见面试问题
Q1: synchronized和ReentrantLock的区别?
架构师老王的回答:
特性 | synchronized | ReentrantLock |
---|---|---|
使用方式 | 关键字,自动释放 | API调用,手动释放 |
功能 | 基础锁功能 | 更丰富的功能 |
性能 | JDK6后优化,差距很小 | 略优 |
公平性 | 非公平 | 支持公平锁 |
中断响应 | 不支持 | 支持 |
超时 | 不支持 | 支持 |
条件变量 | 单个(wait/notify) | 多个Condition |
Q2: 什么是AQS?它是如何工作的?
核心机制:
// AQS核心思想
public abstract class AbstractQueuedSynchronizer {// 同步状态private volatile int state;// 等待队列头节点private transient volatile Node head;// 等待队列尾节点private transient volatile Node tail;// 子类需要实现的方法protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
}
Q3: 线程池的工作原理?
执行流程:
- 任务提交时,如果核心线程数未满,创建核心线程执行
- 核心线程满了,任务加入队列等待
- 队列满了,创建非核心线程执行
- 达到最大线程数,执行拒绝策略
🎯 总结
小明:“老王,Java并发编程的知识点真的很多!”
架构师老王:"是的,但这些是构建高性能Java应用的基础!记住几个要点:
- 理解内存模型 - volatile、synchronized的原理
- 掌握AQS - 大部分并发工具的基础
- 合理使用锁 - 根据场景选择synchronized或Lock
- 善用线程池 - 避免频繁创建线程的开销
并发编程的核心是:安全性、活跃性、性能的平衡!"
掌握这些核心概念,你就能写出高效、安全的并发程序! 🚀