DelayQueue 源码分析

TOP 带着问题看源码

  1. DelayQueue 的应用场景
  2. DelayQueue 实现原理

1. 基本介绍

我们先来看一下它的实现类图,它实现了 Delayed、BlockingQueue 接口和 AbstractQueue 基础类,从实现的功能上看,它首先是一个阻塞队列,然后 Delayed 接口是标记给定延迟后执行的对象,结合类名也可以大致的分析出:DelayQueue 是一个 延时阻塞 队列

img

2. 成员变量分析

1
2
3
4
5
6
7
8
// 保证线程安全的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 标记取元素时是否有线程在排队
private Thread leader = null;
// 是否可取的条件变量
private final Condition available = lock.newCondition();

3. 核心方法分析

3.1 入队操作

3.1.1 offer(E e)

入队逻辑很简单:

  1. 把数据加入到优先队列里

  2. 如果添加的元素是堆顶元素

    2.1 leader 置空

    2.2 唤醒 “可取” 条件队列的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

3.2 出队操作

3.2.1 poll()

出队非阻塞API,核心逻辑就是:

  1. 检查堆顶元素,如果为空或者还没到期呢,返回 null
  2. 否则返回取出堆顶元素
1
2
3
4
5
6
7
8
9
10
11
12
13
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}

3.2.2 take()

出队阻塞API,核心逻辑就是:

  1. 检查堆顶元素,如果为空就等待添加元素时候被唤醒重试
  2. 如果不为空,并且已经过期就直接取出来,没过期并且前面没有线程等待,就等待超时时间后唤醒重试
  3. 每次取完都会唤醒 “可取” 条件队列的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶元素
E first = q.peek();
// 为空,等待被唤醒
if (first == null)
available.await();
// 堆顶不为空
else {
// 获取元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 已过期就取走
if (delay <= 0)
return q.poll();
// 没过期往下走
// 这里设置为 null 是放在还有别的线程持有没有释放导致内存泄漏
first = null; // don't retain ref while waiting
// 校验是否有等待线程,有就等待leader线程取完或有新加入的元素唤醒它
if (leader != null)
available.await();
// 没有等待线程,就把自己设置为等待线程,然后等待超时时间唤醒重试
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 唤醒后就把 leader 置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有等待线程并且队列还有数据,就唤醒下一个线程来取
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

回到问题 TOP 2 ,通过对入队列和出队列的分析,其实现原理想必已经明白,就是在队列的基础上增加了时间维度的优先级,然后通过锁和条件变量来控制取/放流程。

4. 总结

和我们开头分析的一样,DelayQueue是一个 阻塞延时且无界 的队列,它使用的是优先级队列+时间维度来实现。回到问题 TOP 1 延时队列场景主要适用于定时任务,但是对于内存中的延时队列往往不能用于重要的业务场景(毕竟还是内存队列,宕机了就没咯),所以可以应用于一些基础类库,不太重要的业务定时清理和处理等。