要深入了解Java中的多线程并发AbstractQueuedSynchronizer(AQS)需要掌握以下三个方面的知识:
- AQS是什么?
- AQS的使用方式是怎样的?
- AQS的示例说明是怎样的?
下面将按照这三个方面的顺序逐一讲解。
1. AQS是什么?
AQS是Java.util.concurrent包中的一个类,它是所有同步类的基础。AQS的主要作用是提供了实现协调线程间共享访问并发访问的基础框架,也就是说AQS是实现锁和其他同步工具的关键性组件。
AQS的核心思想是,如果共享资源被占用,那么就需要先排队等待获取共享资源的权限,排队使用FIFO队列进行,当共享资源被释放后,排队等待的线程会依次竞争获取共享资源的权限。因此,AQS提供了同步状态的获取、等待和释放的机制,同时支持线程间挂起与唤醒。
2. AQS的使用方式是怎样的?
AQS的使用方式是通过自定义同步组件来实现,同步组件一般继承AQS类并实现tryAcquire、tryRelease、tryAcquireShared等方法来获取或释放共享资源的权限。
具体的使用步骤如下:
- 继承AQS类并实现tryAcquire、tryRelease、tryAcquireShared等方法
- 使用AQS提供的acquire、acquireShare等方法来实现同步操作
- 应用程序中使用自定义同步组件实现并发控制
以下是一个简单的互斥锁示例,通过继承AQS实现同步组件,使用acquire和release方法实现互斥控制:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Mutex extends AbstractQueuedSynchronizer {
// 独占方式获取同步状态,该方法需要查询当前同步状态并判断同步状态是否符合预期,
// 然后再进行CAS设置同步状态。这里实现的是不可重入锁。
protected boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个状态量
if (compareAndSetState(0, 1)) { // 使用CAS设置当前状态。如果当前状态为0,则设置为1. 如果当前状态不是0,则表示已经有线程占用该同步状态,返回false
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前拥有独占访问权限的线程
return true;
}
return false;
}
// 独占方式释放同步状态,该方法会释放掉当前线程所持有的同步状态,
// 然后将同步状态置为可以被占用的状态。
protected boolean tryRelease(int releases) {
assert releases == 1; // 这里限定只能为1个状态量
if (getState() == 0) throw new IllegalMonitorStateException(); // 如果当前状态为0,则抛出异常。因为一个未被持有的状态,无法释放
setExclusiveOwnerThread(null); // 清空当前线程持有的状态
setState(0); // 将同步状态设置为未被占用状态
return true;
}
// 共享方式获取同步状态,该方法需要查询当前同步状态并判断同步状态是否符合预期,
// 然后再进行CAS设置同步状态。
protected int tryAcquireShared(int acquires) {
return super.tryAcquireShared(acquires);
}
// 共享方式释放同步状态,该方法会释放掉当前线程所持有的同步状态,
// 然后将同步状态置为可以被占用的状态。
protected boolean tryReleaseShared(int releases) {
return super.tryReleaseShared(releases);
}
}
其中,tryAcquire和tryRelease方法是实现独占式获取和释放同步状态的方法,而tryAcquireShared和tryReleaseShared方法是实现获取和释放共享式同步状态的方法。对于互斥锁来说,只需要关注tryAcquire和tryRelease方法即可。
3. AQS的示例说明是怎样的?
下面是一个基于AQS实现的阻塞队列的示例说明:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.InterruptedException;
public class BlockingQueue<E> {
private final Lock lock = new ReentrantLock(); // 锁对象
private final Condition notEmpty = lock.newCondition(); // 非空条件变量
private final Condition notFull = lock.newCondition(); // 非满条件变量
private final Object[] items; // 内部存储结构
private int count; // 队列中元素的个数
private int putIndex; // 下一个元素的插入位置
private int takeIndex; // 下一个元素的删除位置
public BlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("capacity must be greater than 0");
}
items = new Object[capacity];
}
public void put(E e) throws InterruptedException {
lock.lock(); // 加锁
try {
while (count == items.length) { // 队列已满,需要等待非满条件
notFull.await(); // 非满条件变量等待
}
items[putIndex] = e; // 插入操作
if (++putIndex == items.length) { // 如果达到队列内部存储长度,需要回绕
putIndex = 0;
}
count++;
notEmpty.signal(); // 唤醒等待非空条件的线程
} finally {
lock.unlock(); // 解锁
}
}
public E take() throws InterruptedException {
lock.lock(); // 加锁
try {
while (count == 0) { // 队列为空,需要等待非空条件
notEmpty.await(); // 非空条件变量等待
}
Object item = items[takeIndex]; // 取出操作
items[takeIndex] = null;
if (++takeIndex == items.length) { // 如果达到队列内部存储长度,需要回绕
takeIndex = 0;
}
count--;
notFull.signal(); // 唤醒等待非满条件的线程
return (E) item;
} finally {
lock.unlock(); // 解锁
}
}
}
这个示例是一个简单的阻塞队列,采用了AQS提供的Lock、Condition等同步工具来实现。当队列为空时,take操作需要等待notEmpty条件变量;当队列已满时,put操作需要等待notFull条件变量。从而实现了阻塞等待的功能,这种情况下,synchronized关键字是无法很好地实现的。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java 多线程并发AbstractQueuedSynchronizer详情 - Python技术站