AbstractQueuedSynchronizer 源码分析

摘要:

  1. AQS 的数据结构
  2. AQS 的设计模式
  3. AQS 的核心思想

TOP 带着问题看源码

  1. AQS 的数据结构
  2. AQS 的设计模式
  3. AQS 的核心思想

1. 基本介绍

前面我们已经介绍和分析了管程,以及 JVM 层面的管程而 AQS 则是 Java 并发包中管程的一种实现。

下面是 AQS 的类实现关系图

2. 成员变量分析

1
2
3
4
5
6
7
8
9
// 头结点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// AbstractOwnableSynchronizer.class
// 当前持有独占锁的线程,类似对象头的Thread ID,可以用来判断是否为重入
private transient Thread exclusiveOwnerThread;

2.1 Node 节点分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 节点状态,有下面几种取值
volatile int waitStatus;
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 当前线程取消了锁的竞争
static final int CANCELLED = 1;
// 后继节点需要被唤醒
static final int SIGNAL = -1;
// 当前节点线程正在等待条件
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 双向链表
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点封装的线程
volatile Thread thread;
// 条件队列的单向链表
Node nextWaiter;

回到问题 TOP 1 ,可以分析得到 AQS 的数据结构是一个双向链表,并维护了一个全局状态

3. 核心方法分析

3.1 尝试获取锁

tryAcquire 这个方法 AQS 是定义了一个空方法,交由子类自行实现,这里也是采用了 模板设计模式

我们先暂时理解 tryAcquire 是尝试获取一下锁,后面会结合具体实现类来分析

可以看到如果 tryAcquire(arg) 返回 true,方法就结束了,如果返回 false 则往下走加入队列,加入成功就设置中断状态

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

接下来我们来分析 addWaiter(Node.EXCLUSIVE)

如果你看过之前分析 Synchronized 那一篇的重量锁阶段相信你看到这会发现很熟悉,没错!就是在竞争失败后把当前线程封装到 node 节点(独占模式),与 Synchronized 不同的是 node 不是放在队列头部而是塞到队列的队尾处。

核心逻辑就是通过 CAS 自旋进行入队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列不为空 直接 CAS 入队,成功直接返回
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列为空 或 CAS竞争失败 调用 enq 自旋入队
enq(node);
return node;
}

我们来看下 enq(node) 的逻辑,很简单就是:

  1. 队列为空就初始化 head 节点(初始化之前是null,这里new Node中waitStatus是0) ,队列不为空后就继续循环走下面入队的逻辑
  2. 不为空说明是 CAS 竞争失败,尝试自旋入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquire(arg) 中的 addWaiter(Node.EXCLUSIVE) 已经分析完了,接下来我们分析外层的 acquireQueued() 方法。

上面逻辑已经把节点放入队列了,接下来的逻辑就是会把放入队列的节点不断获取锁,直到成功或者中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是队头就尝试获取锁,因为这个节点有可能是刚初始化的
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 注意,唯一 return 跳出方法的地方
return interrupted;
}
// 说明要么上面分支没获取到锁,要么不是头节点
// 接下来我们分析 shouldParkAfterFailedAcquire
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 根据 return 之前的赋值可以知道,failed 为 true 只有 tryAcquire 异常时候会出现
if (failed)
// 将 node 节点设置为 CANCELLED 状态
cancelAcquire(node);
}
}

接下来我们分析 shouldParkAfterFailedAcquire(p, node) 方法,注意传过来的第一个节点是 前驱节点 ,第二个是当前节点

该方法核心思想就是判断当前线程是否应该被挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点是唤醒状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws>0 代表前驱节点取消了排队
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 因为依赖前驱节点的唤醒,所以前驱不能是取消状态,再往前找,一直找到前驱不是取消状态的才停止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 排除 ws = -1 和 ws > 0,加上前面初始化节点并没有看见设置 waitStatus
// 所以进入这个分支的也就是 waitStatus 为0
// 使用 CAS 把前驱节点状态设置为 唤醒状态,再次循环时候就会从第一个分支 return true
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如果上个方法返回为 true ,就会接着调用 park 挂起当前线程。

正常来说 第一次都会为 false,因为 第一次只是设置状态,第二次才会校验状态

3.1.1 tryAcquire(arg) 的实现

我们主要拿 ReentrantLockSemaphore 的实现来举例子

ReentrantLock 有两个版本的实现,一个是公平锁,一个是非公平锁

我们先来看非公平锁,并没有什么特殊处理,就是先尝试获取一下,成功就返回,失败就入 AQS 的等待队列

ReentrantLock.Sync#nonfairTryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c 为 0,说明还没有线程持有锁
if (c == 0) {
// 尝试 CAS 一下,成功就直接返回
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果有线程且是当前线程,说明是重入锁,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

接下来我们来看公平锁,可以看到在非公平的基础上多判断一次 hasQueuedPredecessors , hasQueuedPredecessors 的逻辑很简单,就是判断队列为不为空,如果不为空说明还有等待的,就不往下走了

ReentrantLock.FairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 队列为空,就 CAS 尝试获取一下锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 队列不为空返回 true
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

Semaphore 是 Java 层面实现的一个信号量,也是分为公平和非公平版本,Semaphore 也是基于 AQS 来实现的,它是通过一个许可介质,获取许可就把许可减少,如果许可数小于0,就入队列阻塞等待许可的归还;归还许可的时候就把许可数增加。信号量这块可以参考前面的文章,有专门讲解这个机制。

我们先来看非公平的实现,其实就是对许可的减少

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取许可数量
int available = getState();
// 减去当前要获取的许可的剩余的许可数量
int remaining = available - acquires;
// CAS 修改许可的数量,如果小于0,则返回负数,在上一层调用的时候如果为负数会加入 AQS 的队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

再来看看公平锁的实现,相信看到 hasQueuedPredecessors 你又懂了

1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

3.2 尝试释放锁

tryAcquire 一样, tryRelease 也是 AQS 中的一个模板方法,我们后面会分析 tryRelease 的具体实现,我们先来分析 release

同获取锁 acquire 不一样,这里是子类实现返回 true才往下走(后面我们会知道,这个是指是否完全释放),后面会调用 unparkSuccessor 方法来唤醒后继节点,需要注意的是传入的节点是 head 节点

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor 方法核心逻辑就是唤醒 head 的后继节点,如果后继节点的状态不是需要被唤醒的状态,就从后往前找到 waitStatus 是唤醒状态的最前面的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 有可能后继节点取消了等待
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到最前面的一个waitStatus <= 0的节点,赋值给 s 等待被唤醒
if (t.waitStatus <= 0)
s = t;
}
// 后继节点正常且不为空就唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

3.2.1 tryRelease(arg) 的实现

同样的,我们主要拿 ReentrantLockSemaphore 的实现来举例子

先来看 ReentrantLock 的实现,如果可重入的次数已经减少完了就返回true,走 AQS 的模板方法调用唤醒操作

ReentrantLock.Sync#tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 这里考虑到了是否为重入锁,也就是是否完全释放,
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新状态数
setState(c);
return free;
}

然后来看下 Semaphore 的实现,更新许可,成功则返回 true,走 AQS 的模板进行唤醒操作

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 当前许可数量,因为有可能有竞争,所以每次自旋后重新获取许可数量进行归还
int current = getState();
// 新增许可数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新许可数量,失败就重试
if (compareAndSetState(current, next))
return true;
}
}

通过对核心方法的分析,我们可以知道 AQS 定义了很多模板方法,扩展逻辑交由子类实现。

回到问题 TOP 2 ,可以知道采用的设计模式是模板设计模式

4. 总结

由上面对成员变量和核心方法的分析,我们可以看到 AQS 这个管程的实现其实和概念上是相同的,就是对队列和状态值的一个维护,也可以明白 Java 为什么使用管程为核心实现同步,其优势是面向对象,把复杂逻辑封装起来,对于使用更友好。

同样我们对 Semaphore 这个 Java 层面的信号量实现的分析,也明白了管程那篇文章中写的 管程和信号量等价 ,因为我们可以使用管程来实现信号量,也可以使用信号量来实现管程,只是管程对我们更加友好!

以上的总结也是对问题 TOP 3 的一个回答。