Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

下面是详细讲解“Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)”的完整攻略。

1. 前言

本文主要分析 Java 并发包中最重要的实现类之一:AbstractQueuedSynchronizer,并以此为引子学习 Java 中的锁与并发编程。具体地,我们会讨论以下几个问题:

  1. AbstractQueuedSynchronizer 的实现原理
  2. ReentrantLockReentrantReadWriteLock 是如何基于 AbstractQueuedSynchronizer 实现的
  3. 什么是条件队列(Condition Queue),以及它和锁、同步器的关系
  4. 如何避免锁的不可重入性

2. AbstractQueuedSynchronizer 概述

AbstractQueuedSynchronizer(简称 AQS)是 Java 并发包中最基础、最重要、也是最复杂的类之一。在 Java 中,AQS 用于实现锁、同步器、阻塞队列等用于并发编程的基本组件。例如,ReentrantLockReentrantReadWriteLockSemaphore, CountDownLatch 等常见的类实现都基于 AQS

AQS 的主要目的是提供基于阻塞的锁和相关同步器的实现。具体地,AQS 客户端使用该同步器的代码应该最好是基于下面这个框架:

class MyMutex {
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        protected boolean tryAcquire(int acquires) {
            return compareAndSetState(0, 1);
        }

        protected boolean tryRelease(int releases) {
            return compareAndSetState(1, 0);
        }

        protected int tryAcquireShared(int acquires) {
            return 0;
        }

        protected boolean tryReleaseShared(int releases) {
            return true;
        }
    }
}

在这个例子中,MyMutex 其实就是想要实现一种互斥锁(mutex)。sync 变量则是 AQS 告诉我们,其内部实现需要维护一个同步状态。在这个例子中,Sync 静态内部类继承了 AQS,并同时定义了 tryAcquiretryRelease 方法来实现简单的 semaphore 或者 lock 功能。注意到这些方法都是已经实现好的模板方法,因此,子类实现的任务就是调用这些方法并定义合适的状态值来描述 Sync 对象。

3. ReentrantLockReentrantReadWriteLock

ReentrantLockReentrantReadWriteLock 的实现都基于 AQSReentrantLock 提供了比 Java 语言内部的 synchronized 代码块更加灵活的并发控制,而 ReentrantReadWriteLock 允许多个线程同时读取共享数据,但是对于写操作则进行排它控制,以此来提高系统并发吞吐量并减小锁的竞争。

代码示例:

import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private static int counter = 0;
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        class CountThread extends Thread {
            @Override public void run() {
                for (int i = 0; i < 100_000; ++i) {
                    addOne();
                }
            }
        }

        Thread t1 = new CountThread();
        Thread t2 = new CountThread();
        Thread t3 = new CountThread();

        t1.start();
        t2.start();
        t3.start();

        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException ie) {
            System.err.println("ERROR: " + ie.getMessage());
        }

        System.out.println(counter);
    }

    private static void addOne() {
        lock.lock();
        try {
            ++counter;
        } finally {
            lock.unlock();
        }
    }
}

4. 条件队列(Condition Queue)

除了实现锁和同步器的基本功能之外,AQS 在 Java 中还是一个轻量级的阻塞队列的实现。这个队列本质上是由一个链表数据结构组成的,用于保存需要被阻塞的线程:

另外,AQS 还实现了一个非常重要的接口:java.util.concurrent.locks.Condition。Condition 接口提供了让一个线程在满足某个条件时才能往前执行的功能,也就是允许线程之间进行通信(协调)。具体来说,它可以替代在 Object 类中定义的 wait()/notify() 的使用,以此更好地控制休眠中的线程、目的地唤醒线程。

通过 Condition 接口,我们可以为某个 Lock 对象创建 Condition 对象来进行同步协作,例如:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private static int counter = 0;
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        class CountThread extends Thread {
            @Override public void run() {
                for (int i = 0; i < 100_000; ++i) {
                    addOne();
                }
            }
        }

        Thread t1 = new CountThread();
        Thread t2 = new CountThread();
        Thread t3 = new CountThread();

        t1.start();
        t2.start();
        t3.start();

        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException ie) {
            System.err.println("ERROR: " + ie.getMessage());
        }

        System.out.println(counter);
    }

    private static void addOne() {
        lock.lock();
        try {
            ++counter;
            if (counter % 1_000_000 == 0) {
                System.out.println(counter + " Items Added!");
                condition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

在这个例子中,我们利用了 ReentrantLock 的 condition 对象来实现阻塞的效果。睡眠的线程会在其它线程发出 signal() 或 signalAll() 命令时唤醒。

5. 避免锁的不可重入性

一个锁如果是不可重入(non-reentrant)的,那么同一个线程多次去 acquire() 这个锁,就会卡在那里,进入死锁状态。下面是一个锁的实现,它在不正确进行重入时会卡死:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Lock {
    private AtomicBoolean isLocked = new AtomicBoolean(false);

    public void lock() {
        while (isLocked.compareAndSet(false, true)) {
            // busy waiting
        }
    }

    public void unlock() {
        isLocked.set(false);
    }
}

public class Main {
    private static Lock lock = new Lock();
    private static int counter = 0;

    public static void main(String[] args) {
        class CountThread extends Thread {
            @Override public void run() {
                for (int i = 0; i < 100_000; ++i) {
                    lock.lock();
                    try {
                        lock.lock();  // 锁的不当重入
                        ++counter;
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        Thread t1 = new CountThread();
        Thread t2 = new CountThread();
        Thread t3 = new CountThread();

        t1.start();
        t2.start();
        t3.start();

        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException ie) {
            System.err.println("ERROR: " + ie.getMessage());
        }

        System.out.println(counter);
    }
}

在这个例子中,我们意外地锁在了自己持有的锁上,导致了不可重入状态。为了解决这个问题,我们需要引入同步器层次结构(hierarchical structure of synchronizers)。此外,我们也需要使用 ReentrantLock 自己的机制来进行锁的重入。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private static ReentrantLock lock = new ReentrantLock();
    private static int counter = 0;

    public static void main(String[] args) {
        class CountThread extends Thread {
            @Override public void run() {
                for (int i = 0; i < 100_000; ++i) {
                    lock.lock();
                    try {
                        lock.lock();  // 正确的重入
                        ++counter;
                    } finally {
                        lock.unlock();
                        lock.unlock();
                    }
                }
            }
        }

        Thread t1 = new CountThread();
        Thread t2 = new CountThread();
        Thread t3 = new CountThread();

        t1.start();
        t2.start();
        t3.start();

        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException ie) {
            System.err.println("ERROR: " + ie.getMessage());
        }

        System.out.println(counter);
    }
}

在这个例子中,我们利用了 ReentrantLock 类来实现合适的锁重入。注意到,我们使用同一个锁的两个调用最后是由同一个线程释放的。endcode

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列) - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • Java多线程的具体介绍与使用笔记小结

    Java多线程的具体介绍与使用 什么是多线程 多线程指的是在同一时间内,CPU运行多个线程来完成不同的任务。在Java中,每个线程都是独立的执行路径,使得程序可以分配更多的资源去处理其他任务,并确保线程之间的相互独立。 多线程的优点 多线程的优点主要体现在以下几个方面: 实现并发编程,提升代码的效率和性能; 减少线程资源的竞争,提高程序的响应性和稳定性; 分…

    多线程 2023年5月17日
    00
  • Java多线程饥饿与公平介绍及代码示例

    Java多线程饥饿与公平介绍及代码示例 概述 在并发编程中,线程的调度策略决定了线程的运行顺序和优先级。Java多线程中存在两种调度策略,即公平调度和非公平调度,而线程饥饿则是非公平调度中的一种现象。 公平调度指的是按照线程的申请顺序进行调度,使得线程在等待时间相等的情况下,能够按照一定的顺序得到执行。而非公平调度不保证线程的执行顺序,可能会导致某些线程无法…

    多线程 2023年5月16日
    00
  • Linux下的多线程编程(三)

    Linux下的多线程编程(三)完整攻略 1. pthread_join函数 pthread_join函数主要用于等待一个线程结束,并获取它的退出状态。函数的原型为: int pthread_join(pthread_t thread, void **retval); 其中,第一个参数thread是要等待的线程ID,如果值为零,则等待任何一个线程。第二个参数r…

    多线程 2023年5月17日
    00
  • Python多线程threading模块用法实例分析

    下面我来详细讲解一下“Python多线程threading模块用法实例分析”的攻略。 简介 Python是一门高级编程语言,它在处理大规模数据时十分高效。Python标准库中提供了threading模块,可以在Python中实现多线程编程。多线程的运用可以提高程序的并行度,从而加快程序的运行速度,特别是在处理大规模数据时特别有效。 线程创建 在Python中…

    多线程 2023年5月16日
    00
  • Java多线程实现复制文件

    当需要复制一个较大的文件时,我们可能需要使用多线程来进行文件复制以提高效率。以下是使用Java多线程实现文件复制的完整攻略: 步骤1:导入所需的库 import java.io.*; 在Java程序中,我们需要使用Java IO库来读写文件。因此,首先需要导入这个库。 步骤2:创建文件读取和写入对象 File inputFile = new File(&qu…

    多线程 2023年5月16日
    00
  • 浅析Linux下一个简单的多线程互斥锁的例子

    下面是“浅析Linux下一个简单的多线程互斥锁的例子”的完整攻略。 什么是互斥锁? 互斥锁是一种为了保护临界区资源而提供的同步原语。当一个线程获得了互斥锁之后,其他所有的线程都将被阻塞,直到这个线程释放了互斥锁。这样就保证了临界区资源的独占性,避免了并发访问可能带来的数据竞争问题。 Linux下简单的多线程互斥锁的例子 以下是一个使用互斥锁的线程代码示例。这…

    多线程 2023年5月16日
    00
  • Java 多线程并发编程_动力节点Java学院整理

    Java 多线程并发编程攻略 Java 多线程并发编程是 Java 开发中必不可少的技能,能够充分利用多核 CPU 在同一时间处理多个任务,提高程序的并发性和效率。本文将为大家介绍 Java 多线程并发编程的攻略,包括线程的创建、同步、互斥、线程池等知识点。 线程的创建 Java 中创建线程有两种方式,一种是继承 Thread 类,另一种是实现 Runnab…

    多线程 2023年5月16日
    00
  • java多线程开启的三种方式你知道吗

    当我们需要在Java程序中同时执行多个任务时,可以使用多线程技术来提高程序的效率和响应能力。Java中开启多线程的方式有三种: 继承Thread类并重写run()方法 实现Runnable接口并实现run()方法 实现Callable接口并实现call()方法 1. 继承Thread类并重写run()方法 继承Thread类的方式是最简单也是最常用的开启新线…

    多线程 2023年5月17日
    00
合作推广
合作推广
分享本页
返回顶部