使用Java手写阻塞队列是一种常见的并发编程技巧。这在许多场合下非常有用,例如当多个线程需要访问共享资源时,或者需要实现生产者-消费者模型时。下面是手写阻塞队列示例代码及其解释:
步骤1:定义接口
interface CustomBlockingQueue<T> {
void put(T item) throws InterruptedException;
T take() throws InterruptedException;
}
这里定义了一个泛型接口CustomBlockingQueue
来表示阻塞队列。它包含两个方法:put
和take
。阻塞队列的生产者使用put
方法向队列中添加元素,在队列已满时会阻塞等待;而消费者使用take
方法从队列中获取元素,在队列为空时会阻塞等待。这样可以防止生产者向队列中添加元素过多导致队列溢出,或者消费者从队列中获取元素过多导致队列为空。
步骤2:实现接口
class CustomArrayBlockingQueue<T> implements CustomBlockingQueue<T> {
private final T[] items;
private volatile int head, tail;
private final Object notEmpty = new Object();
private final Object notFull = new Object();
@SuppressWarnings("unchecked")
public CustomArrayBlockingQueue(int maxSize) {
items = (T[]) new Object[maxSize];
}
public void put(T item) throws InterruptedException {
while (isFull()) {
synchronized (notFull) {
notFull.wait();
}
}
items[tail] = item;
if (++tail == items.length) {
tail = 0;
}
synchronized (notEmpty) {
notEmpty.notify();
}
}
public T take() throws InterruptedException {
while (isEmpty()) {
synchronized (notEmpty) {
notEmpty.wait();
}
}
T item = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
synchronized (notFull) {
notFull.notify();
}
return item;
}
private boolean isEmpty() {
return head == tail && items[head] == null;
}
private boolean isFull() {
return head == tail && items[head] != null;
}
}
上面的代码实现了CustomBlockingQueue
接口定义的两个方法:put
和take
。它使用了Object
类型的锁和wait
和notify
方法来实现线程的等待和唤醒。队列容量大小是有限的,由构造函数参数maxSize
指定。当队列已满时,put
方法将会等待直到队列空闲;而当队列为空时,take
方法将会等待直到队列非空。队列的头末指针head
和tail
用来指示当前队列状态,head
指向队列的头部,tail
指向队列的尾部。当head
和tail
相等时,队列可能为空或者已满,需要进行判断。put和take方法在结束时都会唤醒可能等待的线程。
示例1:多个生产者和消费者共享阻塞队列
下面的代码示例说明了如何使用上面实现的阻塞队列来实现多个生产者和消费者共享一个队列。
假设我们有两个生产者和两个消费者:Producer1
、Producer2
、Consumer1
和 Consumer2
。我们让所有生产者和消费者共享一个阻塞队列。
public class SharedBlockingQueueExample {
public static void main(String[] args) {
CustomBlockingQueue<String> queue = new CustomArrayBlockingQueue<>(20);
Producer producer1 = new Producer(queue, "Producer1");
Producer producer2 = new Producer(queue, "Producer2");
Consumer consumer1 = new Consumer(queue, "Consumer1");
Consumer consumer2 = new Consumer(queue, "Consumer2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
生产者线程和消费者线程的实现如下:
class Producer extends Thread {
private final CustomBlockingQueue<String> queue;
private final String name;
public Producer(CustomBlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
for (int i = 1; i <= 100; i++) {
String item = "item " + i + " from " + name;
queue.put(item);
System.out.println("put " + item);
sleep(new Random().nextInt(100));
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
class Consumer extends Thread {
private final CustomBlockingQueue<String> queue;
private final String name;
public Consumer(CustomBlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
String item = queue.take();
System.out.println("take " + item + " by " + name);
sleep(new Random().nextInt(100));
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
运行这个程序,我们会看到所有的生产者和消费者不断地生产和消费元素,并且队列的容量保持在20个以内。如果队列已满,则生产者线程会被阻塞等待;如果队列为空,则消费者线程会被阻塞等待。这确保了队列中的元素数量不会超过我们设定的容量。
示例2:阻塞队列实现生产者-消费者模型
下面的代码示例说明了如何使用上面实现的阻塞队列来实现生产者-消费者模型。
假设我们有一个生产者线程和一个消费者线程,它们共享一个阻塞队列,生产者线程不断地向队列中添加元素,消费者线程不断地从队列中获取元素。我们设置队列容量为10,一旦队列已满则生产者线程将会被阻塞等待;一旦队列为空则消费者线程将会被阻塞等待。
public class ProducerConsumerExample {
public static void main(String[] args) {
CustomBlockingQueue<Integer> queue = new CustomArrayBlockingQueue<>(10);
ProducerThread producer = new ProducerThread(queue);
ConsumerThread consumer = new ConsumerThread(queue);
producer.start();
consumer.start();
}
}
class ProducerThread extends Thread {
private final CustomBlockingQueue<Integer> queue;
ProducerThread(CustomBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i <= 50; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
class ConsumerThread extends Thread {
private final CustomBlockingQueue<Integer> queue;
ConsumerThread(CustomBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
在此示例程序中,我们生成了50个数字并向阻塞队列中添加。由于队列的容量为10,当队列已满时,生产者线程将会被阻塞等待,直到队列有空闲位置。在消费者线程中,我们不断地从队列中获取元素,如果队列为空,则消费者线程会被阻塞等待,直到队列非空。
总结
实现阻塞队列是一项非常重要的并发编程技巧。在实际开发中,我们通常会使用Java内置的阻塞队列实现(如java.util.concurrent.ArrayBlockingQueue
和java.util.concurrent.LinkedBlockingQueue
等)。手写的阻塞队列实现更多地是一种理论上的练习。无论使用哪种方法,阻塞队列的基本概念和操作方法都是类似的。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:利用Java手写阻塞队列的示例代码 - Python技术站