Java 并发中 DelayQueue 延迟队列原理剖析
DelayQueue 是 Java 并发包中提供的一种特殊队列,它能够在一定的时间内延迟一些操作的执行。下面就来深入了解一下 DelayQueue 的原理。
DelayQueue 的基本特点
DelayQueue 继承自 java.util.concurrent.Delayed 接口,它的元素必须要实现该接口以表明在队列中的延迟过期时间以及当元素到达过期时间后应该怎样处理。DelayQueue 的一些基本特点如下:
- 允许 null 元素。
- 延迟元素必须实现 Delayed 接口,其中判断是否到期的方法是 getDelay(),其返回值表示过期时间与当前时间的差值。
- 如果队列中没有元素到期,那么就会一直阻塞等待,这样可以保证队列中先到期的元素先被处理。
DelayQueue 的源码分析
下面是 DelayQueue 的部分源码,以及注释解释:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 等待在 DelayQueue 中的元素个数,实际上就是 q 的 size()
private int size;
// 把一个元素放到 DelayQueue 中,这个方法会返回 false,因为 put() 永远不会阻塞
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 将元素插入到优先队列 q 中
if (q.peek() == e) { // 如果新插入的元素变成了队首,那么需要唤醒可能正在等待的线程
leader = null;
available.signal();
}
size++;
return true;
} finally {
lock.unlock();
}
}
// 获取队首元素,但是如果队首没有到期,那么该方法会阻塞直到超时
private E peekExpired() {
// 获取队首元素,但不弹出
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return first;
}
// 从 DelayQueue 中弹出一个元素,如果没有到期,那么返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = peekExpired(); // 获取队首元素,但是如果队首没有到期,那么该方法会阻塞直到超时
if (first == null)
return null;
E x = q.poll(); // 弹出队首元素
assert x == first;
size--;
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}
从上面的源码可知,DelayQueue 内部数据结构是基于 PriorityQueue 实现的。因为 PriorityQueue 是一种优先队列,是一种能够自动排序的队列,并且重要的元素都放在队首,而 DelayQueue 的每个元素都有一个过期时间,所以将 DelayQueue 实现为基于 PriorityQueue 的高级队列是较为合理的。
下面给出一段使用 DelayQueue 的基本示例代码:
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Message> delayQueue = new DelayQueue<>();
delayQueue.put(new Message("Message ID 1", 5000L));
delayQueue.put(new Message("Message ID 2", 1000L));
delayQueue.put(new Message("Message ID 3", 2000L));
delayQueue.put(new Message("Message ID 4", 1500L));
// 模拟实时接收消息并处理
while (!delayQueue.isEmpty()) {
Message message = delayQueue.take();
System.out.println(new Date() + ": " + message);
}
}
}
// 该类是 DelayQueue 中元素的实体类,必须实现 Delayed 接口
class Message implements Delayed {
private String id;
private Long delayTime; // 延迟到期时间
public Message(String id, Long delayTime) {
this.id = id;
this.delayTime = System.currentTimeMillis() + delayTime;
}
// 返回值是元素的剩余延迟时间,当返回值小于等于 0 时,表示到期,可以从队列中取出
@Override
public long getDelay(TimeUnit unit) {
return delayTime - System.currentTimeMillis();
}
// 用于计数器才有的,作用是更快的比较两个元素大小,如果返回 0,则表示两个元素相等
@Override
public int compareTo(Delayed o) {
return this.getDelay(TimeUnit.SECONDS) > o.getDelay(TimeUnit.SECONDS) ? 1 : -1;
}
@Override
public String toString() {
return "Message{" +
"id='" + id + '\'' +
", delayTime=" + delayTime +
'}';
}
}
上面的示例代码就是一个简单的基于 DelayQueue 的消息队列。在这个示例中,我们使用了 DelayQueue 来传递一些消息,但是每个消息到达后会被指定一个延迟时间,也就是在延迟时间之后才能处理该消息。在 while 循环内部使用 delayQueue.take() 方法从队列中取出一个元素,该方法会阻塞等待,直到队列中有元素到期。在控制台输出中可以看到每个元素出队列的时间以及它的 ID。
同时,内部也为 DelayQueue 中元素的实体类提供了一个基本示例,该类必须实现 Delayed 接口,并可以通过实现该接口中的两个方法分别来规定每个元素的延迟时间和具体的处理逻辑。
以上就是 Java 并发中 DelayQueue 延迟队列原理剖析的核心内容。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:java并发中DelayQueue延迟队列原理剖析 - Python技术站