AbstractQueuedSynchronizer 源码分析

TOP 带着问题看源码

  1. AQS 的数据结构
  2. AQS 的设计模式
  3. AQS 的核心思想

1. 基本介绍

前面我们已经介绍和分析了管程,以及 JVM 层面的管程而 AQS 则是 Java 并发包中管程的一种实现。

下面是 AQS 的类实现关系图

img

2. 成员变量分析

1
2
3
4
5
6
7
8
9
// 头结点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// AbstractOwnableSynchronizer.class
// 当前持有独占锁的线程,类似对象头的Thread ID,可以用来判断是否为重入
private transient Thread exclusiveOwnerThread;

2.1 Node 节点分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 节点状态,有下面几种取值
volatile int waitStatus;
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 当前线程取消了锁的竞争
static final int CANCELLED = 1;
// 后继节点需要被唤醒
static final int SIGNAL = -1;
// 当前节点线程正在等待条件
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 双向链表
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点封装的线程
volatile Thread thread;
// 条件队列的单向链表
Node nextWaiter;

回到问题 TOP 1 ,可以分析得到 AQS 的数据结构是一个双向链表,并维护了一个全局状态

3. 核心方法分析

3.1 尝试获取锁

tryAcquire 这个方法 AQS 是定义了一个空方法,交由子类自行实现,这里也是采用了 模板设计模式

我们先暂时理解 tryAcquire 是尝试获取一下锁,后面会结合具体实现类来分析

可以看到如果 tryAcquire(arg) 返回 true,方法就结束了,如果返回 false 则往下走加入队列,加入成功就设置中断状态

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

接下来我们来分析 addWaiter(Node.EXCLUSIVE)

如果你看过之前分析 Synchronized 那一篇的重量锁阶段相信你看到这会发现很熟悉,没错!就是在竞争失败后把当前线程封装到 node 节点(独占模式),与 Synchronized 不同的是 node 不是放在队列头部而是塞到队列的队尾处。

核心逻辑就是通过 CAS 自旋进行入队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列不为空 直接 CAS 入队,成功直接返回
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列为空 或 CAS竞争失败 调用 enq 自旋入队
enq(node);
return node;
}

我们来看下 enq(node) 的逻辑,很简单就是:

  1. 队列为空就初始化 head 节点(初始化之前是null,这里new Node中waitStatus是0) ,队列不为空后就继续循环走下面入队的逻辑
  2. 不为空说明是 CAS 竞争失败,尝试自旋入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquire(arg) 中的 addWaiter(Node.EXCLUSIVE) 已经分析完了,接下来我们分析外层的 acquireQueued() 方法。

上面逻辑已经把节点放入队列了,接下来的逻辑就是会把放入队列的节点不断获取锁,直到成功或者中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是队头就尝试获取锁,因为这个节点有可能是刚初始化的
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 注意,唯一 return 跳出方法的地方
return interrupted;
}
// 说明要么上面分支没获取到锁,要么不是头节点
// 接下来我们分析 shouldParkAfterFailedAcquire
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 根据 return 之前的赋值可以知道,failed 为 true 只有 tryAcquire 异常时候会出现
if (failed)
// 将 node 节点设置为 CANCELLED 状态
cancelAcquire(node);
}
}

接下来我们分析 shouldParkAfterFailedAcquire(p, node) 方法,注意传过来的第一个节点是 前驱节点 ,第二个是当前节点

该方法核心思想就是判断当前线程是否应该被挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点是唤醒状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws>0 代表前驱节点取消了排队
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 因为依赖前驱节点的唤醒,所以前驱不能是取消状态,再往前找,一直找到前驱不是取消状态的才停止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 排除 ws = -1 和 ws > 0,加上前面初始化节点并没有看见设置 waitStatus
// 所以进入这个分支的也就是 waitStatus 为0
// 使用 CAS 把前驱节点状态设置为 唤醒状态,再次循环时候就会从第一个分支 return true
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如果上个方法返回为 true ,就会接着调用 park 挂起当前线程。

正常来说 第一次都会为 false,因为 第一次只是设置状态,第二次才会校验状态

3.1.1 tryAcquire(arg) 的实现

我们主要拿 ReentrantLockSemaphore 的实现来举例子

ReentrantLock 有两个版本的实现,一个是公平锁,一个是非公平锁

我们先来看非公平锁,并没有什么特殊处理,就是先尝试获取一下,成功就返回,失败就入 AQS 的等待队列

ReentrantLock.Sync#nonfairTryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c 为 0,说明还没有线程持有锁
if (c == 0) {
// 尝试 CAS 一下,成功就直接返回
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果有线程且是当前线程,说明是重入锁,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

接下来我们来看公平锁,可以看到在非公平的基础上多判断一次 hasQueuedPredecessors , hasQueuedPredecessors 的逻辑很简单,就是判断队列为不为空,如果不为空说明还有等待的,就不往下走了

ReentrantLock.FairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 队列为空,就 CAS 尝试获取一下锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 队列不为空返回 true
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

Semaphore 是 Java 层面实现的一个信号量,也是分为公平和非公平版本,Semaphore 也是基于 AQS 来实现的,它是通过一个许可介质,获取许可就把许可减少,如果许可数小于0,就入队列阻塞等待许可的归还;归还许可的时候就把许可数增加。信号量这块可以参考前面的文章,有专门讲解这个机制。

我们先来看非公平的实现,其实就是对许可的减少

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取许可数量
int available = getState();
// 减去当前要获取的许可的剩余的许可数量
int remaining = available - acquires;
// CAS 修改许可的数量,如果小于0,则返回负数,在上一层调用的时候如果为负数会加入 AQS 的队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

再来看看公平锁的实现,相信看到 hasQueuedPredecessors 你又懂了

1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

3.2 尝试释放锁

tryAcquire 一样, tryRelease 也是 AQS 中的一个模板方法,我们后面会分析 tryRelease 的具体实现,我们先来分析 release

同获取锁 acquire 不一样,这里是子类实现返回 true才往下走(后面我们会知道,这个是指是否完全释放),后面会调用 unparkSuccessor 方法来唤醒后继节点,需要注意的是传入的节点是 head 节点

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor 方法核心逻辑就是唤醒 head 的后继节点,如果后继节点的状态不是需要被唤醒的状态,就从后往前找到 waitStatus 是唤醒状态的最前面的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 有可能后继节点取消了等待
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到最前面的一个waitStatus <= 0的节点,赋值给 s 等待被唤醒
if (t.waitStatus <= 0)
s = t;
}
// 后继节点正常且不为空就唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

3.2.1 tryRelease(arg) 的实现

同样的,我们主要拿 ReentrantLockSemaphore 的实现来举例子

先来看 ReentrantLock 的实现,如果可重入的次数已经减少完了就返回true,走 AQS 的模板方法调用唤醒操作

ReentrantLock.Sync#tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 这里考虑到了是否为重入锁,也就是是否完全释放,
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新状态数
setState(c);
return free;
}

然后来看下 Semaphore 的实现,更新许可,成功则返回 true,走 AQS 的模板进行唤醒操作

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 当前许可数量,因为有可能有竞争,所以每次自旋后重新获取许可数量进行归还
int current = getState();
// 新增许可数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新许可数量,失败就重试
if (compareAndSetState(current, next))
return true;
}
}

通过对核心方法的分析,我们可以知道 AQS 定义了很多模板方法,扩展逻辑交由子类实现。

回到问题 TOP 2 ,可以知道采用的设计模式是模板设计模式

4. 总结

由上面对成员变量和核心方法的分析,我们可以看到 AQS 这个管程的实现其实和概念上是相同的,就是对队列和状态值的一个维护,也可以明白 Java 为什么使用管程为核心实现同步,其优势是面向对象,把复杂逻辑封装起来,对于使用更友好。

同样我们对 Semaphore 这个 Java 层面的信号量实现的分析,也明白了管程那篇文章中写的 管程和信号量等价 ,因为我们可以使用管程来实现信号量,也可以使用信号量来实现管程,只是管程对我们更加友好!

以上的总结也是对问题 TOP 3 的一个回答。