深入浅出学习AQS组件攻略
什么是AQS
AQS (AbstractQueuedSynchronizer) 是 Java 并发包提供的一个用于构建锁和同步器的基础框架,是Java并发编程中重要的底层实现。
AQS的设计思想是对java.util.concurrent包所有同步器的公共行为进行抽象和封装,以便于在实现具体同步器(如ReentrantLock、Semaphore等)时使用,该组件的实现使用了一个FIFO队列和一个int变量(state)来实现对同步状态的管理,具有灵活性和扩展性,可以方便地支持类似独占锁和共享锁这样的同步语义。
AQS实现原理
AQS的实现原理可以在深度掌握Java并发编程知识的基础上来进行深入研究和理解。
AQS内部使用一个FIFO队列来记录等待其他线程释放锁的线程,这个队列的元素为Node。每个Node包含了线程本身的信息以及该线程在等待队列中的前驱和后继节点信息,线程这种方式被包装起来以便于同步队列管理。
AQS要求实现的同步器需要维护一个int变量state,state表示同步器的状态,具体含义由同步器自己来定义。
AQS通过CAS操作(Compare And Swap,即无阻塞的乐观锁机制)来保证对共享资源的访问的正确性,当一个线程试图获取锁时,AQS会首先尝试原子地把state的值的某个位(或几个位)由0改为1,如果成功了则说明该线程获取了锁,否则,AQS会把这个线程“安排”到等待队列中为获取到锁而等待。当锁被持有者释放时,AQS会从等待队列中唤醒一个线程,被唤醒的线程会再次尝试获取锁。
AQS的应用
AQS可以用于实现独占锁和共享锁两种同步方式。
独占锁
独占锁只能由一个线程来占有,实现可重入的、互斥访问共享资源的目的。其常见的实现是ReentrantLock类。
具体实现代码如下:
public class MyReentrantLock extends AbstractQueuedSynchronizer {
/**
* Returns true if lock is held by current thread else false.
* @return boolean
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* Try to acquire lock.
* From AbstractQueuedSynchronizer#tryAcquire
*
* @param args args
* @return boolean
*/
@Override
protected boolean tryAcquire(int args) {
if(compareAndSetState(0, 1)) {
// Exclusive ownership has been established
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* Try to release lock.
* From AbstractQueuedSynchronizer#tryRelease
*
* @param args args
* @return boolean
*/
@Override
protected boolean tryRelease(int args) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
从代码中可以看到,MyReentrantLock继承了AbstractQueuedSynchronizer,并实现了父类的三个方法用于协同控制锁的获取和释放。
共享锁
共享锁允许多个线程同时访问共享资源,不能独占资源。Semaphore和CountDownLatch就是基于共享锁同步器实现的。
Semaphore
Semaphore是用来控制同时访问特定资源的线程数量,常用于限制线程并发访问的情况。一个Semaphore实例有一个permits变量和一个等待队列,permits表示最多有多少个线程同时访问资源,当获取许可时,permits减1;当释放许可时,permits加1。
从下面的示例代码可以看出Semaphore用法。
public class SemaphoreTest {
static class MyTask implements Runnable {
Semaphore semaphore;
MyTask(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获取许可,开始执行");
Thread.sleep((long) (Math.random() * 1000));
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 释放许可,继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int permits = 5;
Semaphore semaphore = new Semaphore(permits);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
MyTask task = new MyTask(semaphore);
executorService.execute(task);
}
executorService.shutdown();
}
}
运行结果如下:
pool-1-thread-1 获取许可,开始执行
pool-1-thread-5 获取许可,开始执行
pool-1-thread-2 获取许可,开始执行
pool-1-thread-3 获取许可,开始执行
pool-1-thread-8 获取许可,开始执行
pool-1-thread-6 获取许可,开始执行
pool-1-thread-9 获取许可,开始执行
pool-1-thread-4 获取许可,开始执行
pool-1-thread-7 获取许可,开始执行
pool-1-thread-5 释放许可,继续执行
pool-1-thread-3 释放许可,继续执行
pool-1-thread-1 释放许可,继续执行
pool-1-thread-6 释放许可,继续执行
pool-1-thread-4 释放许可,继续执行
pool-1-thread-2 释放许可,继续执行
pool-1-thread-8 释放许可,继续执行
pool-1-thread-7 释放许可,继续执行
pool-1-thread-9 释放许可,继续执行
从结果中可以看出,最多只有5个线程同时被允许执行。当许可被占满时,后续线程会进入等待队列,等待正在执行的线程释放许可。
CountDownLatch
CountDownLatch是一种多线程同步工具,允许一个或多个线程等待其他线程完成操作。CountDownLatch初始化时需要一个计数器,表示需要等待的线程数量;每个线程完成操作后,计数器减1;当计数器为0时,所有等待线程被唤醒。
public class CountDownLatchTest {
static class Worker implements Runnable {
private final CountDownLatch latch;
Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 正在执行");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " 执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executor.execute(new Worker(latch));
}
executor.shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("所有线程执行完毕");
}
}
}
运行结果如下:
pool-1-thread-1 正在执行
pool-1-thread-4 正在执行
pool-1-thread-5 正在执行
pool-1-thread-2 正在执行
pool-1-thread-3 正在执行
pool-1-thread-2 执行完毕
pool-1-thread-1 执行完毕
pool-1-thread-5 执行完毕
pool-1-thread-4 执行完毕
pool-1-thread-3 执行完毕
所有线程执行完毕
可以看到,所有线程执行完毕后,主线程才继续向下执行。如果没有CountDownLatch等待线程的机制,主线程无法保证输出语句在所有线程运行完毕后再被执行。
总结来说,AQS提供了一种底层、灵活的实现多种同步方式的框架,既方便了同步器的实现,也为高层级的同步工具(如ReentrantLock、Semaphore、CountDownLatch等)提供了基础。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:深入浅出学习AQS组件 - Python技术站