ConcurrentLinkedQueue 源码分析

TOP 带着问题看源码

  1. ConcurrentLinkedQueue 是怎么保证线程安全的
  2. ConcurrentLinkedQueue 和 LinkedBlockingQueue 区别是啥
  3. ConcurrentLinkedQueue 的应用场景

1. 基本介绍

ConcurrentLinkedQueue 是一个线程安全且 非阻塞无界 队列,它采用先进先出的规则,实现了 AbstractQueue 基础抽象类和 Queue 接口。

它在内部维护了一个 Node 类,有存放数据的 item 和 执行下一个节点的指针 next,全部通过 CAS 来操作。

2. 成员变量分析

在前面分析阻塞队列 ArrayBlockingQueue 时候,我们发现对数据的增删都是从数组的第一个元素进行操作, 这里我们可以把 head 节点理解为相同的索引作用。

1
2
3
4
5
6
7
8
9
10
// 头节点
private transient volatile Node<E> head;
// 尾节点
private transient volatile Node<E> tail;

// ConcurrentLinkedQueue.Node
// 存放数据
volatile E item;
// 下一个节点指针
volatile Node<E> next;

3. 核心方法分析

3.1 入队列操作

3.1.1 offer(E e)

入队列的API,核心逻辑就是:

  1. 把数据封装为 node 节点,设置为当前尾结点的 next 节点,失败就重试
  2. 每两次 CAS 更新 next 节点 更新一次 tail 节点
  3. 重置尾结点 p
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 循环入队,直到成功
// t 是tail节点
// p 是链表此时此刻的尾结点(也可以理解为入队节点)
for (Node<E> t = tail, p = t;;) {
// q 是尾结点的 next 节点
Node<E> q = p.next;
// 如果q为null,说明到链表尾部了
if (q == null) {
// CAS 更新 next为新节点,失败就重试
if (p.casNext(null, newNode)) {
// 然后更新tail节点
// 这里采用的是一种巧妙的累计更新,也就是说下个 CAS p才会不等于tail
// 相当于循环两次更新一次 tail,所以才有了最下面的两个判断来设置尾结点p
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
// p 和 p的next都为空(非对象为空,还是有next的)
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
// 重置 p节点为下一个入队节点(这一步可以理解为校准,入队节点最终永远要指向最新的尾结点)
p = (p != t && t != (t = tail)) ? t : q;
}
}

3.2 出队列操作

3.2.1 poll()

可以看到 poll 的大概代码设计是和 offer 差不多的,这里是把 p 做为 头节点来维护(出队节点),同样是每两次 CAS 更新数据,更新一次头节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
restartFromHead:
for (;;) {
// 从头开始
for (Node<E> h = head, p = h, q;;) {
// 取数据
E item = p.item;
// CAS 置null
if (item != null && p.casItem(item, null)) {
// 更新 head 节点
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 队列为空返回 null(可以看到是非阻塞的)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
// 重置p节点尾下一个出队节点
else
p = q;
}
}
}

回到问题 TOP 2 ,通过对入队出队的分析,可以分析 ConcurrentLinkedQueue 和 LinkedBlockingQueue 的区别主要还是 一个是非阻塞,一个是阻塞,两者都是线程安全的。

回到问题 TOP 3 ,那场景也就是线程安全且不需要阻塞功能的常规内存队列场景。

3.3 辅助操作

3.3.1 isEmpty()

判断队列是否为空是通过判断头节点是否为空来实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean isEmpty() {
// 查看头节点是否为空
return first() == null;
}

Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取头节点的值,顺带更新一波 Head
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

3.3.2 size()

遍历队列,计数,效率较差

1
2
3
4
5
6
7
8
9
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}

通过对以上的核心方法的分析,回到问题 TOP 1 可以明白基本都是采用 CAS 自旋来实现的,保证了线程的安全性

4. 总结

对比阻塞队列,ConcurrentLinkedQueue 没有条件变量、锁等那些复杂的东西,代码设计层面尽量是简洁、巧妙。