1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

Merge pull request #2569 from 1020325258/12-21

[docs update]AQS增加共享模式获取、释放资源源码分析
This commit is contained in:
Guide 2024-12-23 22:02:49 +08:00 committed by GitHub
commit 00f5ee850a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 345 additions and 28 deletions

View File

@ -56,21 +56,21 @@ AQSAbstractQueuedSynchronizer在 CLH 锁的基础上进一步优化,形
#### AQS 的性能比较好,原因是什么?
因为 AQS 里使用了 `CAS` + `线程阻塞/唤醒`
因为 AQS 内部大量使用了 `CAS` 操作
在 AQS 的实现里,大量使用了 `CAS` 操作,`CAS` 基于内存地址直接进行数据修改,保证并发安全的同时,性能也很好
AQS 内部通过队列来存储等待的线程节点。由于队列是共享资源,在多线程场景下,需要保证队列的同步访问
但是如果一直通过 `CAS` 操作来更新数据,会比较占用 CPU。因此 AQS 同时结合了 `CAS``线程的阻塞/唤醒` 机制,当 `CAS` 没有成功获取资源时,会对线程进行阻塞,避免一直空转占用 CPU 资源
AQS 内部通过 `CAS` 操作来控制队列的同步访问,`CAS` 操作主要用于控制 `队列初始化``线程节点入队` 两个操作的并发安全。虽然利用 `CAS` 控制并发安全可以保证比较好的性能,但同时会带来比较高的 **编码复杂度**
#### AQS 中为什么 Node 节点需要不同的状态?
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
在 AQS 中,一个节点加入队列之后,初始状态为 `0`
- 状态 `0` :新节点加入队列之后,初始状态为 `0`
当有新的节点加入队列,此时新节点的前继节点状态就会由 `0` 更新为 `SIGNAL` ,表示前继节点释放锁之后,需要对新节点进行唤醒操作。
- 状态 `SIGNAL` 当有新的节点加入队列,此时新节点的前继节点状态就会由 `0` 更新为 `SIGNAL` ,表示前继节点释放锁之后,需要对新节点进行唤醒操作。如果唤醒 `SIGNAL` 状态节点的后续节点,就会将 `SIGNAL` 状态更新为 `0` 。即通过清除 `SIGNAL` 状态,表示已经执行了唤醒操作。
如果一个节点在队列中等待获取锁锁时,因为某种原因失败了,该节点的状态就会变为 `CANCELLED` ,表明取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点。
- 状态 `CANCELLED` 如果一个节点在队列中等待获取锁锁时,因为某种原因失败了,该节点的状态就会变为 `CANCELLED` ,表明取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点。
### AQS 核心思想
@ -135,19 +135,30 @@ protected final boolean compareAndSetState(int expect, int update) {
### Node 节点 waitStatus 状态含义
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
| Node 节点状态 | 值 | 含义 |
| ------------- | --- | ------------------------------------------------------------------------------------------------------------------------- |
| `CANCELLED` | 1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。 |
| `SIGNAL` | -1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
| `CONDITION` | -2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 `signal()` 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
| `PROPAGATE` | -3 | 用于共享模式,在共享模式下,前继节点不仅会唤醒后继节点,同时也可能会唤醒后继节点的后继节点。 |
| `PROPAGATE` | -3 | 用于共享模式。在共享模式下,可能会出现线程在队列中无法被唤醒的情况,因此引入了 `PROPAGATE` 状态来解决这个问题。 |
| | 0 | 加入队列的新节点的初始状态。 |
在 AQS 的源码中,经常使用 `> 0``< 0` 来对 `waitStatus` 进行判断。
如果 `waitStatus > 0` ,表明节点的状态已经取消等待获取资源。
如果 `waitStatus < 0` ,表明节点的处于有效的等待状态
如果 `waitStatus < 0` ,表明节点的状态处于正常的状态,即没有取消等待
因此在 AQS 的源码中,经常使用 `> 0``< 0` 来对 `waitStatus` 进行判断。
其中 `SIGNAL` 状态是最重要的,节点状态流转以及对应操作如下:
| 状态流转 | 对应操作 |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `0` | 新节点入队时,初始状态为 `0` 。 |
| `0 -> SIGNAL` | 新节点入队时,它的前继节点状态会由 `0` 更新为 `SIGNAL``SIGNAL` 状态表明该节点的后续节点需要被唤醒。 |
| `SIGNAL -> 0` | 在唤醒后继节点时,需要清除当前节点的状态。通常发生在 `head` 节点,比如 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,表示已经对 `head` 节点的后继节点唤醒了。 |
| `0 -> PROPAGATE` | AQS 内部引入了 `PROPAGATE` 状态,为了解决并发场景下,可能造成的线程节点无法唤醒的情况。(在 AQS 共享模式获取资源的源码分析会讲到) |
### 自定义同步器
@ -200,7 +211,7 @@ public final void acquire(int arg) {
- `tryAcquire()` :尝试获取锁(模板方法),`AQS` 不提供具体实现,由子类实现。
- `addWaiter()` :如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。
- `acquireQueued()` :对线程进行阻塞、唤醒,并调用 `tryAcquire()` 方法让队列中的线程尝试获取锁。
- `acquireQueued()` :对线程进行阻塞,并调用 `tryAcquire()` 方法让队列中的线程尝试获取锁。
#### `tryAcquire()` 分析
@ -583,7 +594,7 @@ private Node addWaiter(Node mode) {
这样如果从 `head` 指针向后遍历,无法找到新入队的节点,因此需要从 `tail` 指针向前遍历找到新入队的节点。
### 图解 AQS 工作原理
### 图解 AQS 工作原理(独占模式)
至此AQS 中以独占模式获取资源、释放资源的源码就讲完了。为了对 AQS 的工作原理、节点状态变化有一个更加清晰的认识,接下来会通过画图的方式来了解整个 AQS 的工作原理。
@ -613,6 +624,306 @@ private Node addWaiter(Node mode) {
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-5.png)
### AQS 资源获取源码分析(共享模式)
AQS 中以独占模式获取资源的入口方法是 `acquireShared()` ,如下:
```JAVA
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
```
`acquireShared()` 方法中,会先尝试获取共享锁,如果获取失败,则将当前线程加入到队列中阻塞,等待唤醒后尝试获取共享锁,分别对应一下两个方法:`tryAcquireShared()``doAcquireShared()`
其中 `tryAcquireShared()` 方法是 AQS 提供的模板方法,由同步器来实现具体逻辑。因此这里以 `Semaphore` 为例,来分析共享模式下,如何获取资源。
#### `tryAcquireShared()` 分析
`Semaphore` 中实现了公平锁和非公平锁,接下来以非公平锁为例来分析 `tryAcquireShared()` 源码。
`Semaphore` 中重写的 `tryAcquireShared()` 方法会调用下边的 `nonfairTryAcquireShared()` 方法:
```JAVA
// Semaphore 重写 AQS 的模板方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Semaphore
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 1、获取可用资源数量。
int available = getState();
// 2、计算剩余资源数量。
int remaining = available - acquires;
// 3、如果剩余资源数量 < 0则说明资源不足直接返回如果 CAS 更新 state 成功则说明当前线程获取到了共享资源直接返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
```
在共享模式下AQS 中的 `state` 值表示共享资源的数量。
`nonfairTryAcquireShared()` 方法中,会在死循环中不断尝试获取资源,如果 「剩余资源数不足」 或者 「当前线程成功获取资源」 ,就退出死循环。方法返回 **剩余的资源数量** ,根据返回值的不同,分为 3 种情况:
- **剩余资源数量 > 0** :表示成功获取资源,并且后续的线程也可以成功获取资源。
- **剩余资源数量 = 0** :表示成功获取资源,但是后续的线程无法成功获取资源。
- **剩余资源数量 < 0** :表示获取资源失败。
#### `doAcquireShared()` 分析
为了方便阅读,这里再贴一下获取资源的入口方法 `acquireShared()`
```JAVA
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
```
`acquireShared()` 方法中,会先通过 `tryAcquireShared()` 尝试获取资源。
如果发现方法的返回值 `< 0` ,即剩余的资源数小于 0则表明当前线程获取资源失败。因此会进入 `doAcquireShared()` 方法,将当前线程加入到 AQS 队列进行等待。如下:
```JAVA
// AQS
private void doAcquireShared(int arg) {
// 1、将当前线程加入到队列中等待。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 2、如果当前线程是等待队列的第一个节点则尝试获取资源。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 3、将当前线程节点移出等待队列并唤醒后续线程节点。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取资源失败就会取消获取资源将节点状态更新为 CANCELLED。
if (failed)
cancelAcquire(node);
}
}
```
由于当前线程已经尝试获取资源失败了,因此在 `doAcquireShared()` 方法中,需要将当前线程封装为 Node 节点,加入到队列中进行等待。
**共享模式** 获取资源和 **独占模式** 获取资源最大的不同之处在于:共享模式下,资源的数量可能会大于 1即可以多个线程同时持有资源。
因此在共享模式下,当线程线程被唤醒之后,获取到了资源,如果发现还存在剩余资源,就会尝试唤醒后边的线程去尝试获取资源。对应的 `setHeadAndPropagate()` 方法如下:
```JAVA
// AQS
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 1、将当前线程节点移出等待队列。
setHead(node);
// 2、唤醒后续等待节点。
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
```
`setHeadAndPropagate()` 方法中,唤醒后续节点需要满足一定的条件,主要需要满足 2 个条件:
- `propagate > 0` `propagate` 代表获取资源之后剩余的资源数量,如果 `> 0` ,则可以唤醒后续线程去获取资源。
- `h.waitStatus < 0` :这里的 `h` 节点是执行 `setHead()` 之前的 `head` 节点。判断 `head.waitStatus` 时使用 `< 0` ,主要为了确定 `head` 节点的状态为 `SIGNAL``PROPAGATE` 。如果 `head` 节点为 `SIGNAL` ,则可以唤醒后续节点;如果 `head` 节点状态为 `PROPAGATE` ,也可以唤醒后续节点(这是为了解决并发场景下出现的问题,后续会细讲)。
代码中关于 **唤醒后续等待节点**`if` 判断稍微复杂一些,这里来讲一下为什么这样写:
```JAVA
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0)
```
- `h == null || h.waitStatus < 0` `h == null` 用于防止空指针异常。正常情况下 h 不会为 `null` ,因为执行到这里之前,当前节点已经加入到队列中了,队列不可能还没有初始化。
`h.waitStatus < 0` 主要判断 `head` 节点的状态是否为 `SIGNAL` 或者 `PROPAGATE` ,直接使用 `< 0` 来判断比较方便。
- `(h = head) == null || h.waitStatus < 0` :如果到这里说明之前判断的 `h.waitStatus < 0` ,说明存在并发。
同时存在其他线程在唤醒后续节点,已经将 `head` 节点的值由 `SIGNAL` 修改为 `0` 了。因此,这里重新获取新的 `head` 节点,这次获取的 `head` 节点为通过 `setHead()` 设置的当前线程节点,之后再次判断 `waitStatus` 状态。
如果 `if` 条件判断通过,就会走到 `doReleaseShared()` 方法唤醒后续等待节点,如下:
```JAVA
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1、队列中至少需要一个等待的线程节点。
if (h != null && h != tail) {
int ws = h.waitStatus;
// 2、如果 head 节点的状态为 SIGNAL则可以唤醒后继节点。
if (ws == Node.SIGNAL) {
// 2.1 清除 head 节点的 SIGNAL 状态,更新为 0。表示已经唤醒该节点的后继节点了。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 2.2 唤醒后继节点
unparkSuccessor(h);
}
// 3、如果 head 节点的状态为 0则更新为 PROPAGATE。这是为了解决并发场景下存在的问题接下来会细讲。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
```
`doReleaseShared()` 方法中,会判断 `head` 节点的 `waitStatus` 状态来决定接下来的操作,有两种情况:
- `head` 节点的状态为 `SIGNAL` :表明 `head` 节点存在后继节点需要唤醒,因此通过 `CAS` 操作将 `head` 节点的 `SIGNAL` 状态更新为 `0` 。通过清除 `SIGNAL` 状态来表示已经对 `head` 节点的后继节点进行唤醒操作了。
- `head` 节点的状态为 `0` :表明存在并发情况,需要将 `0` 修改为 `PROPAGATE` 来保证在并发场景下可以正常唤醒线程。
#### 为什么需要 `PROPAGATE` 状态?
`doReleaseShared()` 释放资源时,第 3 步不太容易理解,即如果发现 `head` 节点的状态是 `0` ,就将 `head` 节点的状态由 `0` 更新为 `PROPAGATE`
AQS 中Node 节点的 `PROPAGATE` 就是为了处理并发场景下可能出现的无法唤醒线程节点的问题。`PROPAGATE` 只在 `doReleaseShared()` 方法中用到一次。
**接下来通过案例分析,为什么需要 `PROPAGATE` 状态?**
在共享模式下,线程获取和释放资源的方法调用链如下:
- 线程获取资源的方法调用链为: `acquireShared() -> tryAcquireShared() -> 线程阻塞等待唤醒 -> tryAcquireShared() -> setHeadAndPropagate() -> if (剩余资源数 > 0) || (head.waitStatus < 0) 则唤醒后续节点`
- 线程释放资源的方法调用链为: `releaseShared() -> tryReleaseShared() -> doReleaseShared()`
**如果在释放资源时,没有将 `head` 节点的状态由 `0` 改为 `PROPAGATE` **
假设总共有 4 个线程尝试以共享模式获取资源,总共有 2 个资源。初始 `T3``T4` 线程获取到了资源,`T1``T2` 线程没有获取到,因此在队列中排队等候。
- 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为(括号内为节点的 `waitStatus` 状态):
`head(-1) -> T1(-1) -> T2(0)`
- 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0) -> T1(-1) -> T2(0)`
- 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中无法唤醒 `head` 的后继节点, 之后线程 `T4` 退出。
- 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。
但是此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,并且 `head` 节点的状态为 `0` ,因此线程 `T1` 并不会在 `setHeadAndPropagate()` 方法中唤醒后续节点。此时等待队列内节点状态为:
`head(-1线程 T1 节点) -> T2(0)`
此时,就导致线程 `T2` 节点在等待队列中,无法被唤醒。对应时刻表如下:
| 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
| ------ | -------------------------------------------------------------- | -------- | ---------------- | ------------------------------------------------------------- | --------------------------------- |
| 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
| 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 3 | | 等待队列 | 已退出 | (执行)释放资源。但 `head` 节点状态为 `0` ,无法唤醒后继节点 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1线程 T1 节点) -> T2(0)` |
**如果在线程释放资源时,将 `head` 节点的状态由 `0` 改为 `PROPAGATE` ,则可以解决上边出现的并发问题,如下:**
- 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为:
`head(-1) -> T1(-1) -> T2(0)`
- 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0) -> T1(-1) -> T2(0)`
- 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中会将 `head` 节点的状态由 `0` 更新为 `PROPAGATE` 之后线程 `T4` 退出。此时等待队列内节点状态为:
`head(PROPAGATE) -> T1(-1) -> T2(0)`
- 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(-1线程 T1 节点) -> T2(0)`
- 在时刻 5 时,虽然此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,但是 `head` 节点状态为 `PROPAGATE < 0` (这里的 `head` 节点是老的 `head` 节点,而不是刚成为 `head` 节点的线程 `T1` 节点)。
因此线程 `T1` 会在 `setHeadAndPropagate()` 方法中唤醒后续 `T2` 节点,并将 `head` 节点的状态由 `SIGNAL` 更新为 `0`。此时等待队列内节点状态为:
`head(0线程 T1 节点) -> T2(0)`
- 在时刻 6 时,线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0线程 T2 节点)`
有了 `PROPAGATE` 状态,就可以避免线程 `T2` 无法被唤醒的情况。对应时刻表如下:
| 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
| ------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ---------------- | ------------------------------------------------------------------- | ------------------------------------ |
| 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
| 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 3 | 未继续向下执行 | 等待队列 | 已退出 | (执行)释放资源。此时会将 `head` 节点状态由 `0` 更新为 `PROPAGATE` | `head(PROPAGATE) -> T1(-1) -> T2(0)` |
| 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1线程 T1 节点) -> T2(0)` |
| 时刻 5 | (执行)由于 `head` 节点状态为 `PROPAGATE < 0` ,因此会在 `setHeadAndPropagate()` 方法中唤醒后续节点,此时将新的 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T2` | 等待队列 | 已退出 | 已退出 | `head(0线程 T1 节点) -> T2(0)` |
| 时刻 6 | 已退出 | (执行)线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点 | 已退出 | 已退出 | `head(0线程 T2 节点)` |
### AQS 资源释放源码分析(共享模式)
AQS 中以共享模式释放资源的入口方法是 `releaseShared()` ,代码如下:
```JAVA
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
```
其中 `tryReleaseShared()` 方法是 AQS 提供的模板方法,这里同样以 `Semaphore` 来讲解,如下:
```JAVA
// Semaphore
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
```
`Semaphore` 实现的 `tryReleaseShared()` 方法中,会在死循环内不断尝试释放资源,即通过 `CAS` 操作来更新 `state` 值。
如果更新成功,则证明资源释放成功,会进入到 `doReleaseShared()` 方法。
`doReleaseShared()` 方法在前文获取资源(共享模式)的部分已进行了详细的源码分析,此处不再重复。
## 常见同步工具类
下面介绍几个基于 AQS 的常见同步工具类。

View File

@ -895,34 +895,40 @@ CompletableFuture.runAsync(() -> {
## AQS
关于 AQS 源码的详细分析,可以看看这一篇文章:[AQS 详解](./aqs.md)。
### AQS 是什么?
AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面
AQS `AbstractQueuedSynchronizer` ,抽象队列同步器)是从 JDK1.5 开始提供的 Java 并发核心组件
![](https://oss.javaguide.cn/github/javaguide/java/AQS.png)
AQS 解决了开发者在实现同步器时的复杂性问题。它提供了一个通用框架,用于实现各种同步器,例如 **可重入锁**`ReentrantLock`)、**信号量**`Semaphore`)和 **倒计时器**`CountDownLatch`。通过封装底层的线程同步机制AQS 将复杂的线程管理逻辑隐藏起来,使开发者只需专注于具体的同步逻辑。
AQS 就是一个抽象类,主要用来构建锁和同步器。
```java
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
```
AQS 为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
简单来说AQS 是一个抽象类,为同步器提供了通用的 **执行框架**。它定义了 **资源获取和释放的通用流程**,而具体的资源获取逻辑则由具体同步器通过重写模板方法来实现。 因此,可以将 AQS 看作是同步器的 **基础“底座”**,而同步器则是基于 AQS 实现的 **具体“应用”**
### ⭐AQS 的原理是什么?
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是**CLH 队列锁** 实现的,即将暂时获取不到锁的线程加入到队列中
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** Craig, Landin, and Hagersten locks 进一步优化实现的。
CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。在 CLH 同步队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
**CLH 锁** 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。**CLH 锁** 的队列结构如下图所示。
CLH 队列结构如下图所示:
![CLH 锁的队列结构](https://oss.javaguide.cn/github/javaguide/open-source-project/clh-lock-queue-structure.png)
![](https://oss.javaguide.cn/p3-juejin/40cb932a64694262993907ebda6a0bfe~tplv-k3u1fbpfcp-zoom-1.png)
AQS 中使用的 **等待队列** 是 CLH 锁队列的变体(接下来简称为 CLH 变体队列)。
AQS(`AbstractQueuedSynchronizer`)的核心原理图(图源[Java 并发之 AQS 详解](https://www.cnblogs.com/waterystone/p/4920797.html))如下
AQS 的 CLH 变体队列是一个双向队列会暂时获取不到锁的线程将被加入到该队列中CLH 变体队列和原本的 CLH 锁队列的区别主要有两点
![](https://oss.javaguide.cn/github/javaguide/java/CLH.png)
- 由 **自旋** 优化为 **自旋 + 阻塞** :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
- 由 **单向队列** 优化为 **双向队列** :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 `next` 指针,成为了双向队列。
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点Node来实现锁的分配。在 CLH 变体队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
AQS 中的 CLH 变体队列结构如下图所示:
![CLH 变体队列结构](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure-bianti.png)
AQS(`AbstractQueuedSynchronizer`)的核心原理图:
![CLH 变体队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **线程等待队列** 来完成获取资源线程的排队工作。
@ -952,7 +958,7 @@ protected final boolean compareAndSetState(int expect, int update) {
`ReentrantLock` 为例,`state` 初始值为 0表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()``state=`0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后动作。
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后动作。
### Semaphore 有什么用?