Java高并发BlockingQueue重要的实现类详解
概述
在Java中,BlockingQueue是一种很重要的线程安全容器,它提供了线程安全的数据存储和获取操作,用于在多线程并发场景中实现生产者-消费者模式的应用。本文将详细介绍BlockingQueue的相关实现类,包括ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue。
ArrayBlockingQueue
ArrayBlockingQueue是通过数组实现的阻塞队列,其容量固定,不能动态增加或减少。ArrayBlockingQueue的元素插入和删除都会阻塞,直到对应的操作可以被执行,所以是一个“阻塞”的队列。在多线程并发场景中,生产者生产的速度很快,如果队列满了,则会阻塞当前线程,直到队列中有元素被消费。另外,ArrayBlockingQueue是公平的队列,即多个线程访问队列时,它们的顺序与请求的顺序一样。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
new Thread(() -> {
try {
queue.put("A");
queue.put("B");
queue.put("C");
System.out.println(Thread.currentThread().getName() + " 生产 ABC 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer").start();
new Thread(() -> {
try {
Thread.sleep(1000); // 线程 B 在 1000ms 后开始消费
queue.take();
queue.take();
System.out.println(Thread.currentThread().getName() + " 消费 AB 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer1").start();
new Thread(() -> {
try {
Thread.sleep(2000); // 线程 C 在 2000ms 后开始消费
queue.take();
System.out.println(Thread.currentThread().getName() + " 消费 C 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer2").start();
}
}
上面的示例代码中,创建了一个大小为 3 的ArrayBlockingQueue,启动一个生产者线程生产 ABC 三个元素,启动两个消费者线程分别消费两个元素和一个元素。由于队列大小为 3,所以生产者生产完 ABC 后就会阻塞等待,直到有消费者线程消费元素。消费者线程也会阻塞等待队列中有元素才开始消费。
LinkedBlockingQueue
LinkedBlockingQueue是通过链表实现的阻塞队列,其容量可以动态增加或减少(没有指定容量则使用默认容量Integer.MAX_VALUE)。LinkedBlockingQueue同样是一个“阻塞”的队列,在多线程并发场景中,当队列满了时,生产者线程会被阻塞,直到队列中有元素被消费;当队列为空时,消费者线程会被阻塞,直到队列中有元素被生产。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(2);
new Thread(() -> {
try {
queue.put("A");
queue.put("B");
queue.put("C"); // 生产 C 时会被阻塞,因为队列已满
System.out.println(Thread.currentThread().getName() + " 生产 ABC 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer").start();
new Thread(() -> {
try {
Thread.sleep(1000); // 线程 B 在 1000ms 后开始消费
queue.take();
queue.take();
System.out.println(Thread.currentThread().getName() + " 消费 AB 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer1").start();
new Thread(() -> {
try {
Thread.sleep(2000); // 线程 C 在 2000ms 后开始消费
queue.take();
System.out.println(Thread.currentThread().getName() + " 消费 C 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer2").start();
}
}
上面的示例代码中,创建了一个大小为 2 的LinkedBlockingQueue,启动一个生产者线程生产 ABC 三个元素,启动两个消费者线程分别消费两个元素和一个元素。由于队列大小为 2,所以生产者生产完 AB 后就会阻塞等待,直到有消费者线程消费元素。消费者线程也会阻塞等待队列中有元素才开始消费。
SynchronousQueue
SynchronousQueue是一种直接传递队列,不会保存任何元素,每一个插入操作必须等待另一个线程的移除操作,反之亦然。在多线程并发场景中,SynchronousQueue是一个异步交换数据的容器,生产者线程会等待消费者线程处理数据,消费者线程也会等待生产者线程提供数据。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
queue.put("A"); // 生产 A 后会等待消费者消费掉 A
System.out.println(Thread.currentThread().getName() + " 生产 A 完成");
queue.put("B"); // 生产 B 后会等待消费者消费掉 B
System.out.println(Thread.currentThread().getName() + " 生产 B 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Producer").start();
new Thread(() -> {
try {
Thread.sleep(1000); // 线程 B 在 1000ms 后开始消费
queue.take(); // 消费 A
System.out.println(Thread.currentThread().getName() + " 消费 A 完成");
Thread.sleep(2000); // 线程 C 在 2000ms 后开始消费
queue.take(); // 消费 B
System.out.println(Thread.currentThread().getName() + " 消费 B 完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer").start();
}
}
上面的示例代码中,创建了一个SynchronousQueue,启动一个生产者线程生产 A 和 B 两个元素,启动一个消费者线程分别消费 A 和 B。由于SynchronousQueue是一种同步传递队列,生产者和消费者线程之间必须一对一收发消息,每个插入操作必须等待另一个线程的移除操作,反之亦然,所以在生产者生产 A 和 B 后会等待消费者消费掉,消费者线程也会等待生产者线程提供数据。
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,其排序原则是自然顺序(数字从小到大、字符串按字典序)或比较器顺序,与普通队列不同的是,PriorityBlockingQueue内部是无锁的,它使用了可重入锁 ReentrantLock、条件变量 Condition 和 CAS等技术来保证线程安全。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueDemo {
static class Person implements Comparable<Person> {
private String name;
private Integer age;
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
@Override
public int compareTo(Person o) {
return age.compareTo(o.age);
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
public static void main(String[] args) {
BlockingQueue<Person> queue = new PriorityBlockingQueue<>();
queue.add(new Person("Tom", 18));
queue.add(new Person("Jim", 22));
queue.add(new Person("Jack", 20));
System.out.println("队首元素:" + queue.peek());
System.out.println("队列长度:" + queue.size());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
//System.out.println(queue.poll()); // 队列为空时,继续出列会返回null
}
}
上面的示例代码中,创建了一个大小为 3 的PriorityBlockingQueue,元素类型为Person,它根据每个Person实例的年龄age属性进行优先级排序。元素插入时会根据年龄自动排序,元素出队时先出优先级最高的元素。另外,PriorityBlockingQueue是一个无界队列,在没有插入元素时,它的队列长度为 0。
总结
BlockingQueue是一个很实用、很重要的线程安全容器,它提供了线程安全的数据存储和获取操作,用于在多线程并发场景中实现生产者-消费者模式的应用。本文介绍了Java高并发情况下常用的BlockingQueue实现类,包括ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue。在实际开发中,要根据场景选择合适的BlockingQueue实现类,以提高程序的可靠性和性能。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java高并发BlockingQueue重要的实现类详解 - Python技术站