privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; // 前驱节点是唤醒状态 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // ws>0 代表前驱节点取消了排队 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { // 因为依赖前驱节点的唤醒,所以前驱不能是取消状态,再往前找,一直找到前驱不是取消状态的才停止 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // 排除 ws = -1 和 ws > 0,加上前面初始化节点并没有看见设置 waitStatus // 所以进入这个分支的也就是 waitStatus 为0 // 使用 CAS 把前驱节点状态设置为 唤醒状态,再次循环时候就会从第一个分支 return true } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
finalintnonfairTryAcquireShared(int acquires){ for (;;) { // 获取许可数量 int available = getState(); // 减去当前要获取的许可的剩余的许可数量 int remaining = available - acquires; // CAS 修改许可的数量,如果小于0,则返回负数,在上一层调用的时候如果为负数会加入 AQS 的队列 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
再来看看公平锁的实现,相信看到 hasQueuedPredecessors 你又懂了
1 2 3 4 5 6 7 8 9 10 11
protectedinttryAcquireShared(int acquires){ for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
同获取锁 acquire 不一样,这里是子类实现返回 true才往下走(后面我们会知道,这个是指是否完全释放),后面会调用 unparkSuccessor 方法来唤醒后继节点,需要注意的是传入的节点是 head 节点
1 2 3 4 5 6 7 8 9
publicfinalbooleanrelease(int arg){ if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; }
unparkSuccessor 方法核心逻辑就是唤醒 head 的后继节点,如果后继节点的状态不是需要被唤醒的状态,就从后往前找到 waitStatus 是唤醒状态的最前面的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatevoidunparkSuccessor(Node node){ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 有可能后继节点取消了等待 if (s == null || s.waitStatus > 0) { s = null; // 从后往前遍历 for (Node t = tail; t != null && t != node; t = t.prev) // 找到最前面的一个waitStatus <= 0的节点,赋值给 s 等待被唤醒 if (t.waitStatus <= 0) s = t; } // 后继节点正常且不为空就唤醒 if (s != null) LockSupport.unpark(s.thread); }
protectedfinalbooleantryReleaseShared(int releases){ for (;;) { // 当前许可数量,因为有可能有竞争,所以每次自旋后重新获取许可数量进行归还 int current = getState(); // 新增许可数量 int next = current + releases; if (next < current) // overflow thrownew Error("Maximum permit count exceeded"); // CAS 更新许可数量,失败就重试 if (compareAndSetState(current, next)) returntrue; } }