下面是详细讲解“Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)”的完整攻略。
1. 前言
本文主要分析 Java 并发包中最重要的实现类之一:AbstractQueuedSynchronizer
,并以此为引子学习 Java 中的锁与并发编程。具体地,我们会讨论以下几个问题:
AbstractQueuedSynchronizer
的实现原理ReentrantLock
和ReentrantReadWriteLock
是如何基于AbstractQueuedSynchronizer
实现的- 什么是条件队列(Condition Queue),以及它和锁、同步器的关系
- 如何避免锁的不可重入性
2. AbstractQueuedSynchronizer
概述
AbstractQueuedSynchronizer
(简称 AQS
)是 Java 并发包中最基础、最重要、也是最复杂的类之一。在 Java 中,AQS
用于实现锁、同步器、阻塞队列等用于并发编程的基本组件。例如,ReentrantLock
、ReentrantReadWriteLock
、Semaphore
, CountDownLatch
等常见的类实现都基于 AQS
。
AQS
的主要目的是提供基于阻塞的锁和相关同步器的实现。具体地,AQS
客户端使用该同步器的代码应该最好是基于下面这个框架:
class MyMutex {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int acquires) {
return compareAndSetState(0, 1);
}
protected boolean tryRelease(int releases) {
return compareAndSetState(1, 0);
}
protected int tryAcquireShared(int acquires) {
return 0;
}
protected boolean tryReleaseShared(int releases) {
return true;
}
}
}
在这个例子中,MyMutex
其实就是想要实现一种互斥锁(mutex)。sync
变量则是 AQS
告诉我们,其内部实现需要维护一个同步状态。在这个例子中,Sync
静态内部类继承了 AQS
,并同时定义了 tryAcquire
和 tryRelease
方法来实现简单的 semaphore 或者 lock 功能。注意到这些方法都是已经实现好的模板方法,因此,子类实现的任务就是调用这些方法并定义合适的状态值来描述 Sync
对象。
3. ReentrantLock
和 ReentrantReadWriteLock
ReentrantLock
和 ReentrantReadWriteLock
的实现都基于 AQS
。ReentrantLock
提供了比 Java 语言内部的 synchronized 代码块更加灵活的并发控制,而 ReentrantReadWriteLock
允许多个线程同时读取共享数据,但是对于写操作则进行排它控制,以此来提高系统并发吞吐量并减小锁的竞争。
代码示例:
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static int counter = 0;
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
class CountThread extends Thread {
@Override public void run() {
for (int i = 0; i < 100_000; ++i) {
addOne();
}
}
}
Thread t1 = new CountThread();
Thread t2 = new CountThread();
Thread t3 = new CountThread();
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ie) {
System.err.println("ERROR: " + ie.getMessage());
}
System.out.println(counter);
}
private static void addOne() {
lock.lock();
try {
++counter;
} finally {
lock.unlock();
}
}
}
4. 条件队列(Condition Queue)
除了实现锁和同步器的基本功能之外,AQS
在 Java 中还是一个轻量级的阻塞队列的实现。这个队列本质上是由一个链表数据结构组成的,用于保存需要被阻塞的线程:
另外,AQS
还实现了一个非常重要的接口:java.util.concurrent.locks.Condition
。Condition 接口提供了让一个线程在满足某个条件时才能往前执行的功能,也就是允许线程之间进行通信(协调)。具体来说,它可以替代在 Object 类中定义的 wait()/notify() 的使用,以此更好地控制休眠中的线程、目的地唤醒线程。
通过 Condition 接口,我们可以为某个 Lock 对象创建 Condition 对象来进行同步协作,例如:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static int counter = 0;
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
class CountThread extends Thread {
@Override public void run() {
for (int i = 0; i < 100_000; ++i) {
addOne();
}
}
}
Thread t1 = new CountThread();
Thread t2 = new CountThread();
Thread t3 = new CountThread();
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ie) {
System.err.println("ERROR: " + ie.getMessage());
}
System.out.println(counter);
}
private static void addOne() {
lock.lock();
try {
++counter;
if (counter % 1_000_000 == 0) {
System.out.println(counter + " Items Added!");
condition.signalAll();
}
} finally {
lock.unlock();
}
}
}
在这个例子中,我们利用了 ReentrantLock 的 condition 对象来实现阻塞的效果。睡眠的线程会在其它线程发出 signal() 或 signalAll() 命令时唤醒。
5. 避免锁的不可重入性
一个锁如果是不可重入(non-reentrant)的,那么同一个线程多次去 acquire() 这个锁,就会卡在那里,进入死锁状态。下面是一个锁的实现,它在不正确进行重入时会卡死:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Lock {
private AtomicBoolean isLocked = new AtomicBoolean(false);
public void lock() {
while (isLocked.compareAndSet(false, true)) {
// busy waiting
}
}
public void unlock() {
isLocked.set(false);
}
}
public class Main {
private static Lock lock = new Lock();
private static int counter = 0;
public static void main(String[] args) {
class CountThread extends Thread {
@Override public void run() {
for (int i = 0; i < 100_000; ++i) {
lock.lock();
try {
lock.lock(); // 锁的不当重入
++counter;
} finally {
lock.unlock();
}
}
}
}
Thread t1 = new CountThread();
Thread t2 = new CountThread();
Thread t3 = new CountThread();
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ie) {
System.err.println("ERROR: " + ie.getMessage());
}
System.out.println(counter);
}
}
在这个例子中,我们意外地锁在了自己持有的锁上,导致了不可重入状态。为了解决这个问题,我们需要引入同步器层次结构(hierarchical structure of synchronizers)。此外,我们也需要使用 ReentrantLock 自己的机制来进行锁的重入。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static ReentrantLock lock = new ReentrantLock();
private static int counter = 0;
public static void main(String[] args) {
class CountThread extends Thread {
@Override public void run() {
for (int i = 0; i < 100_000; ++i) {
lock.lock();
try {
lock.lock(); // 正确的重入
++counter;
} finally {
lock.unlock();
lock.unlock();
}
}
}
}
Thread t1 = new CountThread();
Thread t2 = new CountThread();
Thread t3 = new CountThread();
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ie) {
System.err.println("ERROR: " + ie.getMessage());
}
System.out.println(counter);
}
}
在这个例子中,我们利用了 ReentrantLock 类来实现合适的锁重入。注意到,我们使用同一个锁的两个调用最后是由同一个线程释放的。endcode
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列) - Python技术站