Java并发编程之阻塞队列(BlockingQueue)详解
什么是阻塞队列?
阻塞队列,顾名思义就是在队列的基础上加入了阻塞的特性。当队列满时,阻塞队列会自动阻塞写入线程,直到队列中有元素被移除,而当队列为空时,阻塞队列会自动阻塞读取线程,直到队列中有元素被添加。
Java中的阻塞队列是一个线程安全的队列,实现了如同锁的机制,可以保证多个线程同时访问是安全的,并且提供了多种阻塞方式,适合于不同的业务场景。
阻塞队列的分类
Java中提供了多种不同类型的阻塞队列,常用的有以下几种:
ArrayBlockingQueue
ArrayBlockingQueue是一个由数组构成的有界阻塞队列,队列的容量在创建的时候就固定了,对元素的添加和移除都会阻塞线程,直到成功为止。
可以使用以下方式创建一个ArrayBlockingQueue:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
LinkedBlockingQueue
LinkedBlockingQueue是一个由链表构成的有界或无界阻塞队列,如果队列容量没有限制,则对元素的添加不会阻塞线程(除非内存不足),而对元素的移除会阻塞线程,直到队列中有元素被添加。
可以使用以下方式创建一个LinkedBlockingQueue:
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
SynchronousQueue
SynchronousQueue是一个没有容量的阻塞队列,对元素的添加和移除都会阻塞线程,直到另外一个线程正在等待接收被添加的元素为止。
可以使用以下方式创建一个SynchronousQueue:
BlockingQueue<String> queue = new SynchronousQueue<>();
阻塞队列的基本操作
put方法
put方法会将指定元素插入到队列的尾部并阻塞线程,直到成功添加为止。如果队列已经满了,则线程会一直阻塞。
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
queue.put("hello");
take方法
take方法会将队列的头部元素移除并返回,如果队列为空,则该方法会一直阻塞线程,直到队列中有元素被添加。
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
String str = queue.take();
示例
生产者消费者模式
下面使用阻塞队列来实现一个简单的生产者消费者模式。假设有两个线程,一个线程负责生产数据并将数据放入阻塞队列中,另一个线程负责从阻塞队列中取数据并进行消费。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
}
static class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String data = "data-" + i;
queue.put(data);
System.out.println(Thread.currentThread().getName() + " 生产了 " + data);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String data = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费了 " + data);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在上面的示例中,Producer线程会不断地生产数据并将数据放入阻塞队列中,而Consumer线程则会不断地从阻塞队列中取数据并进行消费。由于阻塞队列的阻塞特性,当队列满了时,生产者线程会被阻塞,直到队列中有数据被消费;当队列为空时,消费者线程会被阻塞,直到队列中有数据被添加。
限流
下面使用阻塞队列来实现一个限流的场景。假设有一个API接口,限制每秒钟只能处理10个请求,超过10个请求的需要被丢弃。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class RateLimitDemo {
private static int MAX_QPS = 10;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(MAX_QPS);
ExecutorService executor = Executors.newCachedThreadPool();
// 开启10个线程不断往队列中添加请求
for (int i = 0; i < MAX_QPS; i++) {
executor.execute(() -> {
while (true) {
try {
queue.put("request");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 开启一个线程处理请求
executor.execute(() -> {
while (true) {
try {
// 从队列中取出请求
String request = queue.take();
System.out.println("正在处理请求:" + request);
TimeUnit.MILLISECONDS.sleep(100); // 模拟处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
在上面的示例中,首先我们创建了一个容量为10的阻塞队列,并开启了10个线程不断地往队列中添加请求。由于队列的容量为10,因此最多可以处理10个请求。同时,我们开启了一个线程来处理请求,该线程会不断地从队列中取出请求并进行处理。由于队列的阻塞特性,如果队列已经满了(即已经有10个请求等待处理),新来的请求会阻塞线程,直到队列中有请求被处理。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发编程之阻塞队列(BlockingQueue)详解 - Python技术站