详解Java并发编程中的优先级队列PriorityBlockingQueue
什么是优先级队列?
优先级队列是一种具有特殊约束条件的队列,它将每个元素赋予一个优先级。具有高优先级的元素将先被取出,而低优先级的元素将后被取出。优先级队列广泛应用于任务调度和资源分配等领域。
介绍PriorityBlockingQueue
PriorityBlockingQueue是JDK 1.5中提供的一个线程安全的优先级队列,它是一个无边界的、可阻塞的队列。当队列为空时,消费者线程将被阻塞直到有新的元素加入。当队列已满时,生产者线程将被阻塞直到队列空闲。PriorityBlockingQueue内部通过堆实现,堆的根节点是优先级最高的元素。
PriorityBlockingQueue的主要方法有以下几种:
- add(E e): 将元素插入到队列中。如果队列已满,则抛出异常。
- put(E e): 将元素插入到队列中。如果队列已满,则阻塞等待直到队列空闲。
- offer(E e): 将元素插入到队列中。如果队列已满,则返回false。
- take(): 移除并返回队列中的头元素。如果队列为空,则阻塞等待直到有新的元素加入。
- poll(): 移除并返回队列中的头元素。如果队列为空,则返回null。
- remove(Object o): 在队列中移除指定元素。
- clear(): 清空队列中所有元素。
- size(): 返回队列中元素的数量。
示例1:使用PriorityBlockingQueue进行任务调度
import java.util.concurrent.*;
public class TaskScheduler {
private static class Task implements Comparable<Task>{
private String name;
private int priority;
public Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task o) {
return Integer.compare(o.priority, priority);
}
public void run() {
System.out.println("Task " + name + " is running!");
}
}
public static void main(String[] args) {
// 创建一个PriorityBlockingQueue
PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();
// 创建并启动5个线程,不断从队列中取出任务并执行
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
while (true) {
Task task = taskQueue.take();
task.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 将10个任务插入到队列中
for (int i = 0; i < 10; i++) {
Task task = new Task("Task-" + i, i);
taskQueue.put(task);
}
}
}
上述代码演示了如何使用PriorityBlockingQueue进行任务调度。每个任务有一个名称和一个优先级,任务被按照优先级从高到低依次执行。程序启动时,创建一个PriorityBlockingQueue和5个线程。每个线程会不断从队列中取出任务并执行。随后将10个任务插入到队列中,任务将按照优先级被执行。
示例2:使用PriorityBlockingQueue进行日志收集
import java.time.LocalDateTime;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class LogCollector {
private static class LogEntry implements Comparable<LogEntry> {
private LocalDateTime timestamp;
private String message;
public LogEntry(String message) {
this.timestamp = LocalDateTime.now();
this.message = message;
}
@Override
public int compareTo(LogEntry o) {
return timestamp.compareTo(o.timestamp);
}
public String toString() {
return timestamp.toString() + " " + message;
}
}
private static PriorityBlockingQueue<LogEntry> logQueue = new PriorityBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
// 创建并启动一个线程,不断从队列中取出日志并输出
new Thread(() -> {
try {
while (true) {
LogEntry entry = logQueue.poll(1, TimeUnit.SECONDS);
if (entry != null) {
System.out.println(entry.toString());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 模拟日志收集,不同的日志有不同的时间戳
for (int i = 0; i < 20; i++) {
String message = "message-" + i;
if (i % 3 == 0) {
Thread.sleep(1000);
} else if (i % 5 == 0) {
Thread.sleep(500);
}
LogEntry entry = new LogEntry(message);
logQueue.offer(entry);
}
}
}
上述代码演示了如何使用PriorityBlockingQueue进行日志收集。每个日志条目有一个时间戳和一条消息。程序启动时,创建一个PriorityBlockingQueue和一个线程。这个线程会不断从队列中取出日志并输出。随后模拟20个日志条目,不同的日志有不同的时间戳。日志条目被插入到队列中,按照时间戳从早到晚被输出。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:详解Java并发编程中的优先级队列PriorityBlockingQueue - Python技术站