Java并发LinkedBlockingQueue源码分析

Java并发LinkedBlockingQueue源码分析

简单介绍

LinkedBlockingQueue是Java并发包中提供的一个阻塞队列实现,它支持在队列两端添加或取出元素,并具有阻塞功能。具体来说,当队列为空时,从队列尾部加入元素的操作将被阻塞;当队列满时,从队列头部取出元素的操作将被阻塞。

源码解析

内部类:Node

在LinkedBlockingQueue类中,有一个内部类Node用来表示队列中的节点,每个Node节点包含元素和下一个节点的引用。

static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

属性成员

源代码中,LinkedBlockingQueue类有四个属性,分别是队列头部节点head、队列尾部节点tail、队列大小count和队列容量capacity

/** Head of linked list */
transient Node<E> head;

/** Tail of linked list */
private transient Node<E> last;

/** Current number of elements */
private AtomicInteger count = new AtomicInteger();

/** Maximum number of elements */
private final int capacity;

构造函数

LinkedBlockingQueue类有三个构造函数,其中两个构造函数分别是用指定容量和默认容量(Integer.MAX_VALUE)创建队列,另一个构造函数是创建一个空的队列。构造函数主要是用来初始化队列头、尾节点和容量大小等属性。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final AtomicInteger count = this.count;
    final Node<E> rear = last;
    for (E e : c) {
        if (e == null)
            throw new NullPointerException();
        if (count.getAndIncrement() == capacity)
            throw new IllegalStateException("Queue full");
        rear.next = new Node<E>(e);
        rear = rear.next;
    }
}

添加元素方法:offer、put、offerLast

  • public boolean offer(E e): 如果队列未满,插入指定的元素到队列末尾,并返回true。否则返回false。
  • public void put(E e) throws InterruptedException: 如果队列未满,插入指定的元素到队列末尾,否则阻塞当前线程直到队列再次变得可用。
  • public boolean offerLast(E e): 插入指定的元素到队列末尾,并返回true。
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    final ReentrantLock putLock = this.putLock;
    final Node<E> node = new Node<E>(e);
    int c = -1;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

public boolean offerLast(E e) {
    return offer(e);
}

上述方法中,offerput方法在队列满时都会阻塞,之间会用到ReentrantLockCondition来保证线程安全和阻塞功能。optionLast方法与offer方法执行的功能相同,不再赘述。

移除元素方法:poll、take、removeFirst

  • public E poll(): 移除并获取队列头部的元素,如果队列为空返回null。
  • public E take() throws InterruptedException: 移除并获取队列头部的元素,如果队列为空阻塞当前线程直到队列有可用元素。
  • public E removeFirst(): 移除并获取队列头部的元素,如果队列为空抛出NoSuchElementException异常。
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

public E take() throws InterruptedException {
    E x;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        int c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

public E removeFirst() {
    E x = pollFirst();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

上述方法中,polltake方法在队列为空时都会阻塞,之间会用到ReentrantLockCondition来保证线程安全和阻塞功能。removeFirst方法中,调用了pollFirst方法,它实现了移除并返回队列头部元素的功能,但如果队列为空则返回null,因此removeFirst还需要检查返回值是否为null,如果是就抛出NoSuchElementException异常。

示例说明

示例一:消息队列

import java.util.concurrent.LinkedBlockingQueue;

class Producer implements Runnable {
    private LinkedBlockingQueue<String> queue;

    public Producer(LinkedBlockingQueue<String> q) {
        queue = q;
    }

    public void run() {
        try {
            queue.put("消息1");
            System.out.println(Thread.currentThread().getName() + ":消息1已放入队列");
            Thread.sleep(1000);
            queue.put("消息2");
            System.out.println(Thread.currentThread().getName() + ":消息2已放入队列");
            Thread.sleep(1000);
            queue.put("消息3");
            System.out.println(Thread.currentThread().getName() + ":消息3已放入队列");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private LinkedBlockingQueue<String> queue;

    public Consumer(LinkedBlockingQueue<String> q) {
        queue = q;
    }

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " 尝试消费...");
            String m1 = queue.take();
            System.out.println(Thread.currentThread().getName() + " 消费消息: " + m1);
            String m2 = queue.take();
            System.out.println(Thread.currentThread().getName() + " 消费消息: " + m2);
            String m3 = queue.take();
            System.out.println(Thread.currentThread().getName() + " 消费消息: " + m3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class LinkedBlockingQueueDemo {
    public static void main(String[] args) {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(2);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        Thread t1 = new Thread(producer, "生产者1");
        Thread t2 = new Thread(consumer, "消费者1");
        Thread t3 = new Thread(consumer, "消费者2");

        t1.start();
        t2.start();
        t3.start();
    }
}

结果输出:

生产者1:消息1已放入队列
消费者1 尝试消费...
消费者1 消费消息: 消息1
消费者2 尝试消费...
生产者1:消息2已放入队列
消费者1 消费消息: 消息2
生产者1:消息3已放入队列
消费者2 消费消息: 消息3

可看出,当队列满时,生产者线程会被阻塞,直到队列变得可用;当队列空时,消费者线程会被阻塞,直到队列有可用元素。这样可以保证生产者、消费者之间的同步与数据一致性。

示例二:线程池任务队列

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Task implements Runnable {
    private int id;

    public Task(int id) {
        this.id = id;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " 正在执行任务-" + id);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 完成任务-" + id);
    }
}

public class LinkedBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        LinkedBlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(4);

        for (int i=1; i<=10; i++) {
            try {
                taskQueue.put(new Task(i));
                System.out.println("任务-" + i + " 已放入队列");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        while (!taskQueue.isEmpty()) {
            try {
                executor.execute(taskQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();
    }
}

结果输出:

任务-1 已放入队列
任务-2 已放入队列
任务-3 已放入队列
任务-4 已放入队列
任务-5 已放入队列
任务-6 已放入队列
任务-7 已放入队列
任务-8 已放入队列
任务-9 已放入队列
任务-10 已放入队列
pool-1-thread-1 正在执行任务-1
pool-1-thread-2 正在执行任务-2
pool-1-thread-1 完成任务-1
任务-3 已放入队列
pool-1-thread-2 完成任务-2
pool-1-thread-1 正在执行任务-3
任务-4 已放入队列
pool-1-thread-2 正在执行任务-4
pool-1-thread-1 完成任务-3
任务-5 已放入队列
pool-1-thread-2 完成任务-4
pool-1-thread-1 正在执行任务-5
任务-6 已放入队列
pool-1-thread-2 正在执行任务-6
pool-1-thread-1 完成任务-5
任务-7 已放入队列
pool-1-thread-2 完成任务-6
pool-1-thread-1 正在执行任务-7
任务-8 已放入队列
pool-1-thread-2 正在执行任务-8
pool-1-thread-1 完成任务-7
任务-9 已放入队列
pool-1-thread-2 完成任务-8
pool-1-thread-1 正在执行任务-9
任务-10 已放入队列
pool-1-thread-2 正在执行任务-10
pool-1-thread-1 完成任务-9
pool-1-thread-2 完成任务-10

可以看出,任务队列可以用于线程池中,可以在队列未满时将任务放入队列,当线程池中的线程空闲时,从队列中取出任务并执行。这样可以保证任务的顺序性和线程池的高效性。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发LinkedBlockingQueue源码分析 - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • java 多线程-线程通信实例讲解

    下面是关于“java 多线程-线程通信实例讲解”的完整攻略: 1. 为什么需要线程通信? 在多线程场景下,线程之间需要相互协作才能完成复杂的逻辑。通常情况下,线程之间的协作需要通过线程通信来实现。 在实际应用中,线程通信主要包括以下两种场景: 生产者和消费者模式:生产者线程负责生产数据,消费者线程负责消费数据。生产者线程需要将生产的数据传递给消费者线程,消费…

    多线程 2023年5月17日
    00
  • 5个并发处理技巧代码示例

    下面我来详细讲解一下“5个并发处理技巧代码示例”的完整攻略。 1. 使用锁机制 在并发处理时,如果多个线程同时访问同一份数据,就会发生数据竞争的问题。为了避免这种问题,可以使用锁机制来实现线程的同步。 例如,下面这段代码展示了如何使用sync.Mutex锁来保证线程安全: import ( "fmt" "sync" )…

    多线程 2023年5月16日
    00
  • Java多线程面试题(面试官常问)

    下面就来详细讲解一下“Java多线程面试题(面试官常问)”的完整攻略。 一、题目解析 在多线程的面试过程中,常会遇到关于线程的基本概念、线程的安全性、线程池的使用等方面的问题。常见的面试题目包括: 1. 什么是线程? 线程是指操作系统能够进行运算调度的最小单位,是程序执行过程中的一个执行单元。 2. 什么是线程安全? 线程安全是指在多线程并发的情况下,共享的…

    多线程 2023年5月16日
    00
  • Go并发与锁的两种方式该如何提效详解

    Go并发与锁的两种方式该如何提效详解 先谈一下Go中的协程和锁 Go语言的协程是一种并发执行代码的方式。协程可以方便的并发执行任务,不需要等待前面的任务完成,直接执行下一个任务,提高了程序运行的效率。 而锁则可以保证在多个协程同时访问共享数据时不会发生冲突。 对于共享数据的并发访问,常用的两种方式 1. 互斥锁 互斥锁是最常用的一种锁。它可以保证在同一时刻只…

    多线程 2023年5月16日
    00
  • java多线程下载实例详解

    Java多线程下载实例详解 本文将介绍Java多线程下载的实现方法和步骤,并提供两个示例说明。 实现步骤 Java多线程下载的实现步骤如下: 获取需要下载的文件的URL地址。 创建多个线程,每个线程负责下载文件的不同部分。 启动多个线程,通过HTTP请求下载各自负责的文件部分。 合并下载完成的文件部分。 完成文件下载。 示例一:Java多线程文件下载 以下示…

    多线程 2023年5月17日
    00
  • Java多线程 线程组原理及实例详解

    Java多线程 线程组原理及实例详解 什么是线程组 线程组是多线程编程中用来管理线程的一种手段,它可以帮助开发者更方便地对线程进行分组、统计信息、控制等操作。线程组通过ThreadGroup类进行实现。 线程组的创建 线程组的创建可以通过如下两种方式进行: 1.无参构造方法创建 ThreadGroup group = new ThreadGroup(&quo…

    多线程 2023年5月17日
    00
  • Java超详细讲解多线程中的Process与Thread

    Java超详细讲解多线程中的Process与Thread攻略 什么是Process与Thread 在Java多线程编程中,Process和Thread是两个重要的概念。其中,Process代表着一个正在执行的进程,而Thread则代表着进程中的一个执行单元。通常一个进程中可以包含多个线程,每个线程都可以独立运行并且具有自己的执行路径、堆栈和局部变量。 Pro…

    多线程 2023年5月17日
    00
  • 浅谈Java并发中ReentrantLock锁应该怎么用

    当我们需要在并发环境下保证数据的正确性时,可以使用Java中的锁来达到目的。其中ReentrantLock是一种可重入锁,也就是说,它可以被同一个线程重复获取,防止了死锁的发生。但是,ReentrantLock的正确使用也需要一些细节上的注意,下面详细讲解一下ReentrantLock在Java并发编程中的应用。 一、ReentrantLock的常规使用方法…

    多线程 2023年5月17日
    00
合作推广
合作推广
分享本页
返回顶部