Java并发LinkedBlockingQueue源码分析
简单介绍
LinkedBlockingQueue
是Java并发包中提供的一个阻塞队列实现,它支持在队列两端添加或取出元素,并具有阻塞功能。具体来说,当队列为空时,从队列尾部加入元素的操作将被阻塞;当队列满时,从队列头部取出元素的操作将被阻塞。
源码解析
内部类:Node
在LinkedBlockingQueue类中,有一个内部类Node
用来表示队列中的节点,每个Node
节点包含元素和下一个节点的引用。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
属性成员
源代码中,LinkedBlockingQueue
类有四个属性,分别是队列头部节点head
、队列尾部节点tail
、队列大小count
和队列容量capacity
。
/** Head of linked list */
transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;
/** Current number of elements */
private AtomicInteger count = new AtomicInteger();
/** Maximum number of elements */
private final int capacity;
构造函数
LinkedBlockingQueue
类有三个构造函数,其中两个构造函数分别是用指定容量和默认容量(Integer.MAX_VALUE
)创建队列,另一个构造函数是创建一个空的队列。构造函数主要是用来初始化队列头、尾节点和容量大小等属性。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final AtomicInteger count = this.count;
final Node<E> rear = last;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (count.getAndIncrement() == capacity)
throw new IllegalStateException("Queue full");
rear.next = new Node<E>(e);
rear = rear.next;
}
}
添加元素方法:offer、put、offerLast
public boolean offer(E e)
: 如果队列未满,插入指定的元素到队列末尾,并返回true。否则返回false。public void put(E e) throws InterruptedException
: 如果队列未满,插入指定的元素到队列末尾,否则阻塞当前线程直到队列再次变得可用。public boolean offerLast(E e)
: 插入指定的元素到队列末尾,并返回true。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
final ReentrantLock putLock = this.putLock;
final Node<E> node = new Node<E>(e);
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public boolean offerLast(E e) {
return offer(e);
}
上述方法中,offer
和put
方法在队列满时都会阻塞,之间会用到ReentrantLock
和Condition
来保证线程安全和阻塞功能。optionLast
方法与offer
方法执行的功能相同,不再赘述。
移除元素方法:poll、take、removeFirst
public E poll()
: 移除并获取队列头部的元素,如果队列为空返回null。public E take() throws InterruptedException
: 移除并获取队列头部的元素,如果队列为空阻塞当前线程直到队列有可用元素。public E removeFirst()
: 移除并获取队列头部的元素,如果队列为空抛出NoSuchElementException异常。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
int c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E removeFirst() {
E x = pollFirst();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
上述方法中,poll
和take
方法在队列为空时都会阻塞,之间会用到ReentrantLock
和Condition
来保证线程安全和阻塞功能。removeFirst
方法中,调用了pollFirst
方法,它实现了移除并返回队列头部元素的功能,但如果队列为空则返回null,因此removeFirst
还需要检查返回值是否为null,如果是就抛出NoSuchElementException异常。
示例说明
示例一:消息队列
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable {
private LinkedBlockingQueue<String> queue;
public Producer(LinkedBlockingQueue<String> q) {
queue = q;
}
public void run() {
try {
queue.put("消息1");
System.out.println(Thread.currentThread().getName() + ":消息1已放入队列");
Thread.sleep(1000);
queue.put("消息2");
System.out.println(Thread.currentThread().getName() + ":消息2已放入队列");
Thread.sleep(1000);
queue.put("消息3");
System.out.println(Thread.currentThread().getName() + ":消息3已放入队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private LinkedBlockingQueue<String> queue;
public Consumer(LinkedBlockingQueue<String> q) {
queue = q;
}
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 尝试消费...");
String m1 = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费消息: " + m1);
String m2 = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费消息: " + m2);
String m3 = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费消息: " + m3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(2);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread t1 = new Thread(producer, "生产者1");
Thread t2 = new Thread(consumer, "消费者1");
Thread t3 = new Thread(consumer, "消费者2");
t1.start();
t2.start();
t3.start();
}
}
结果输出:
生产者1:消息1已放入队列
消费者1 尝试消费...
消费者1 消费消息: 消息1
消费者2 尝试消费...
生产者1:消息2已放入队列
消费者1 消费消息: 消息2
生产者1:消息3已放入队列
消费者2 消费消息: 消息3
可看出,当队列满时,生产者线程会被阻塞,直到队列变得可用;当队列空时,消费者线程会被阻塞,直到队列有可用元素。这样可以保证生产者、消费者之间的同步与数据一致性。
示例二:线程池任务队列
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Task implements Runnable {
private int id;
public Task(int id) {
this.id = id;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在执行任务-" + id);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 完成任务-" + id);
}
}
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
LinkedBlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(4);
for (int i=1; i<=10; i++) {
try {
taskQueue.put(new Task(i));
System.out.println("任务-" + i + " 已放入队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
while (!taskQueue.isEmpty()) {
try {
executor.execute(taskQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
结果输出:
任务-1 已放入队列
任务-2 已放入队列
任务-3 已放入队列
任务-4 已放入队列
任务-5 已放入队列
任务-6 已放入队列
任务-7 已放入队列
任务-8 已放入队列
任务-9 已放入队列
任务-10 已放入队列
pool-1-thread-1 正在执行任务-1
pool-1-thread-2 正在执行任务-2
pool-1-thread-1 完成任务-1
任务-3 已放入队列
pool-1-thread-2 完成任务-2
pool-1-thread-1 正在执行任务-3
任务-4 已放入队列
pool-1-thread-2 正在执行任务-4
pool-1-thread-1 完成任务-3
任务-5 已放入队列
pool-1-thread-2 完成任务-4
pool-1-thread-1 正在执行任务-5
任务-6 已放入队列
pool-1-thread-2 正在执行任务-6
pool-1-thread-1 完成任务-5
任务-7 已放入队列
pool-1-thread-2 完成任务-6
pool-1-thread-1 正在执行任务-7
任务-8 已放入队列
pool-1-thread-2 正在执行任务-8
pool-1-thread-1 完成任务-7
任务-9 已放入队列
pool-1-thread-2 完成任务-8
pool-1-thread-1 正在执行任务-9
任务-10 已放入队列
pool-1-thread-2 正在执行任务-10
pool-1-thread-1 完成任务-9
pool-1-thread-2 完成任务-10
可以看出,任务队列可以用于线程池中,可以在队列未满时将任务放入队列,当线程池中的线程空闲时,从队列中取出任务并执行。这样可以保证任务的顺序性和线程池的高效性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发LinkedBlockingQueue源码分析 - Python技术站