JUC并发编程LinkedBlockingQueue队列深入分析源码
1. LinkedBlockingQueue简介
LinkedBlockingQueue是Java集合框架中的一种队列,它实现了BlockingQueue接口,并且是线程安全的,支持高效并发读写操作。LinkedBlockingQueue是一种无界队列,因此队列中的元素数量可以无限增长,不像ArrayBlockingQueue那样在队列满时会抛出异常。
2. LinkedBlockingQueue源码分析
先来看一下LinkedBlockingQueue的构造方法:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
可以看出LinkedBlockingQueue有两种构造方式,如果不指定初始容量,则容量为Integer.MAX_VALUE,否则将容量设置为指定值,并且要求初始容量必须大于0。
LinkedBlockingQueue是一个链式结构的阻塞队列,所以在实现上使用双向链表来存储数据,链表头节点head和尾节点last的值为null,不存储元素,只是用于标识队列的起点和终点。每个结点都包括节点元素(item)和下一个节点的引用(next)。
其中Node的内部类定义如下:
private static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
LinkedBlockingQueue内部还有两个与队列操作相关的锁:putLock和takeLock。putLock是执行插入操作时所获得的锁,而takeLock是执行获取操作时所获得的锁。这两个锁的目的是控制并发对队列的访问,保证线程安全。
LinkedBlockingQueue的插入方法offer分为两个步骤:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
第一步是通过ReentrantLock的lock()方法来获取putLock锁,插入操作是对队列尾部进行操作,先创建一个新节点node,并将putLock锁住,保证线程安全。然后判断队列是否已满,若未满则将新节点插入到队列尾部。
第二步是若队列为空,通知当前所有正在等待takeLock锁的线程不为空,因为有数据入队了。
接下来是LinkedBlockingQueue的获取方法take的源码:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
take方法首先对队列头进行访问,因此获取的是takeLock锁,并且在获取锁的过程中可以被中断。其次,如果队列为空,则需要进入await()状态,等待其他线程插入元素后再通知取出数据并且处理。
3. LinkedBlockingQueue应用示例
我们可以使用LinkedBlockingQueue来实现生产者消费者模型。
先看生产者:
public class Producer implements Runnable{
private final LinkedBlockingQueue<String> queue;
public Producer(LinkedBlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
for(int i = 0 ;i < 10; i++){
String s = "product" + i;
queue.put(s);
System.out.println("生产者-" + Thread.currentThread().getName() + " 生产:" + s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
生产者使用put()方法将数据放进队列中。
然后是消费者:
public class Consumer implements Runnable{
private final LinkedBlockingQueue<String> queue;
public Consumer(LinkedBlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
while(true){
String s = queue.take();
System.out.println("消费者-" + Thread.currentThread().getName() + " 消费:" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者使用take()方法从队列中取出数据。
最后,启动生产者和消费者线程:
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer1).start();
new Thread(producer2).start();
new Thread(producer3).start();
new Thread(consumer).start();
注意,我们使用了构造方法LinkedBlockingQueue<>(3)来创建一个容量为3的队列。这里启动了3个生产者和1个消费者,每个生产者往队列中插入10个数据,总共会插入30个数据,而消费者则使用了无限循环来取出数据,直接将队列中所有数据全部取出。
运行结果如下:
生产者-Thread-0 生产:product0
生产者-Thread-1 生产:product0
消费者-Thread-3 消费:product0
生产者-Thread-1 生产:product1
生产者-Thread-2 生产:product0
消费者-Thread-3 消费:product1
生产者-Thread-1 生产:product2
生产者-Thread-2 生产:product1
消费者-Thread-3 消费:product2
生产者-Thread-1 生产:product3
生产者-Thread-2 生产:product2
消费者-Thread-3 消费:product3
生产者-Thread-1 生产:product4
生产者-Thread-2 生产:product3
消费者-Thread-3 消费:product4
生产者-Thread-1 生产:product5
生产者-Thread-2 生产:product4
消费者-Thread-3 消费:product5
生产者-Thread-1 生产:product6
生产者-Thread-2 生产:product5
消费者-Thread-3 消费:product6
生产者-Thread-1 生产:product7
生产者-Thread-2 生产:product6
消费者-Thread-3 消费:product7
生产者-Thread-1 生产:product8
生产者-Thread-2 生产:product7
消费者-Thread-3 消费:product8
生产者-Thread-1 生产:product9
生产者-Thread-2 生产:product8
消费者-Thread-3 消费:product9
消费者-Thread-3 消费:product0
消费者-Thread-3 消费:product1
消费者-Thread-3 消费:product2
消费者-Thread-3 消费:product3
消费者-Thread-3 消费:product4
消费者-Thread-3 消费:product5
消费者-Thread-3 消费:product6
消费者-Thread-3 消费:product7
消费者-Thread-3 消费:product8
消费者-Thread-3 消费:product9
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:JUC并发编程LinkedBlockingQueue队列深入分析源码 - Python技术站