一文探究ArrayBlockingQueue函数及应用场景
介绍
ArrayBlockingQueue是Java中的一个阻塞队列实现类,它是一个支持在队列的两端插入和删除元素的线程安全队列。它的大小是有限的,当队列已满时,插入操作会阻塞线程,直到队列有空闲空间;当队列为空时,获取操作会阻塞线程,直到队列有可用元素。
使用方法
创建ArrayBlockingQueue
ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(capacity);
- T表示队列中元素的类型。
- capacity是队列的容量,即队列最多能存放多少个元素。
插入元素
queue.put(element);
- element表示要插入的元素。
- 如果队列已经满了,则会阻塞线程。
删除元素
queue.take();
- 如果队列为空,则会阻塞线程,并等待有可用元素。
- 返回队列中最开始的元素,并将其从队列中删除。
查看队列元素个数和容量
queue.size(); // 查看队列元素个数
queue.remainingCapacity(); // 查看队列剩余容量
应用场景
多线程协作
在多线程编程中,一个线程可能要等待另一个线程的操作完成才能进行下一步操作。此时,可以使用ArrayBlockingQueue来实现两个线程之间的协作。
例如,一个线程需要获取另一个线程计算的结果,可以使用一个容量为1的ArrayBlockingQueue来进行线程之间的通信。
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(1);
Thread t1 = new Thread(() -> {
int result = calculate(); // 计算结果
try {
queue.put(result); // 将结果放入队列中
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
int result = queue.take(); // 从队列中取出结果
processResult(result); // 处理结果
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
任务队列
在一些系统中,可能需要有一个任务队列,用来存放需要执行的任务。当有空闲的线程时,从任务队列中取出一个任务进行处理。
此时,可以使用一个容量为N的ArrayBlockingQueue来实现任务队列的功能。
public class TaskQueue {
private final int queueSize = 10;
private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(queueSize);
public void enqueueTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void processTasks() {
while (true) {
try {
Runnable task = queue.take();
task.run(); // 执行任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 在主线程中使用
TaskQueue taskQueue = new TaskQueue();
Thread t1 = new Thread(() -> taskQueue.processTasks());
Thread t2 = new Thread(() -> taskQueue.enqueueTask(() -> {
// 执行任务
}));
t1.start();
t2.start();
示例说明
示例一:使用ArrayBlockingQueue实现生产者消费者模式
public class ProducerConsumerDemo {
private static class Producer implements Runnable {
private ArrayBlockingQueue<Integer> queue;
public Producer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("生产者插入元素:" + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable {
private ArrayBlockingQueue<Integer> queue;
public Consumer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
int element = queue.take();
System.out.println("消费者取出元素:" + element);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
在示例中我们创建了两个线程,一个是生产者线程,用于向队列中插入元素;一个是消费者线程,用于从队列中删除元素。
示例二:使用ArrayBlockingQueue实现线程池
public class ThreadPoolDemo {
private ArrayBlockingQueue<Runnable> queue;
private List<WorkerThread> threads;
private volatile boolean shutdown = false;
private class WorkerThread extends Thread {
public void run() {
while (!shutdown) {
try {
Runnable task = queue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public ThreadPoolDemo(int poolSize, int queueSize) {
queue = new ArrayBlockingQueue<>(queueSize);
threads = new ArrayList<>(poolSize);
for (int i = 0; i < poolSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start();
threads.add(workerThread);
}
}
public void execute(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void shutdown() {
shutdown = true;
for (WorkerThread thread : threads) {
thread.interrupt();
}
}
}
// 在主线程中使用
ThreadPoolDemo threadPool = new ThreadPoolDemo(5, 10);
for (int i = 0; i < 20; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println("执行任务" + index);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
在示例中我们创建了一个线程池,包含5个工作线程和一个容量为10的任务队列。我们通过循环向线程池中提交20个任务,每个任务打印执行的编号,并暂停2秒。最后调用shutdown方法来关闭线程池。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:一文探究ArrayBlockQueue函数及应用场景 - Python技术站