Java并发之BlockingQueue的使用
在Java的并发编程中,常常需要使用阻塞队列来进行线程间的通信。BlockingQueue提供了一种线程安全、并发的队列实现方式,其中阻塞的特性保证了在队列为空或满时线程的阻塞和唤醒。
BlockingQueue简介
BlockingQueue是Java.util.concurrent包下面的一个接口,它定义了阻塞队列的一些基本操作,包括take、put、offer、poll等方法,是实现生产者-消费者模型的重要工具。
BlockingQueue通常结合着线程池或者定时器一起使用。
BlockingQueue常见的实现类有:
- PriorityBlockingQueue:基于优先级的无限阻塞队列
- ArrayBlockingQueue:基于数组的有界阻塞队列
- LinkedBlockingQueue:基于链表的无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列
BlockingQueue的使用
基本方法
以下是BlockingQueue的四种基本方法的说明:
- put(E e):将元素插入到队尾,如果队列满,则阻塞等待空间可用
- take():返回并移除队头元素,如果队列为空,阻塞等待可用元素
- offer(E e):将元素插入到队尾,如果队列已满则返回false
- poll(long timeout, TimeUnit unit):等待指定时长来获取队列头部的元素,如果在等待时间内没有可用元素,则返回null
示例一:生产者-消费者模型
生产者线程将数据放入阻塞队列中,消费者线程从阻塞队列中取出数据进行处理。为了刻画生产者和消费者,这里使用两个不同的线程来演示。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerDemo {
private static final int CAPACITY = 5;
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
static class Producer implements Runnable {
private BlockingQueue<String> queue;
Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
System.out.println("Produced 1");
Thread.sleep(1000);
queue.put("2");
System.out.println("Produced 2");
Thread.sleep(1000);
queue.put("3");
System.out.println("Produced 3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<String> queue;
Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println("Consumed " + queue.take());
Thread.sleep(1000);
System.out.println("Consumed " + queue.take());
Thread.sleep(1000);
System.out.println("Consumed " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上述代码中,Producer线程将三个数据依次放入到阻塞队列中,每放入一个数据就打印出来。Consumer线程则从队列中取出三个数据进行处理。
示例二:多任务并行处理
在一个线程池中,多个任务可以并发地进行处理,BlockingQueue可以很容易地实现这种需求。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TaskHandlerDemo {
private static final int THREAD_POOL_SIZE = 3;
private static final int TASK_QUEUE_SIZE = 10;
private static BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(TASK_QUEUE_SIZE);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
executorService.submit(new Worker(i));
}
for (int i = 0; i < TASK_QUEUE_SIZE; i++) {
try {
taskQueue.put("Task " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
static class Worker implements Runnable {
private int id;
Worker(int id) {
this.id = id;
}
@Override
public void run() {
String task;
while (true) {
try {
task = taskQueue.take();
System.out.println("Worker " + id + " handles " + task);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
上述代码中,创建了一个线程池和一个大小为10的BlockingQueue。在第一个循环中,提交了三个Worker任务。在第二个循环中,向队列中加入10个任务,而这些任务是互不相同的。这样子程序就会开始并行处理这些任务。
总结
- BlockingQueue提供了一种线程安全、并发的队列实现方式
- BlockingQueue是实现生产者-消费者模型的重要工具
- BlockingQueue一般结合线程池或者定时器一起使用
- 阻塞队列通常有以下几种实现:PriorityBlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
希望这篇文章对大家有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发之BlockingQueue的使用 - Python技术站