mirror of
https://github.com/Snailclimb/JavaGuide
synced 2025-06-16 18:10:13 +08:00
[docs update]完善 AQS 内容
This commit is contained in:
parent
6d35f83a6b
commit
15f55e4df4
@ -20,31 +20,91 @@ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchron
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
AQS 为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`等等皆是基于 AQS 的。
|
AQS 为构建锁和同步器提供了一些通用功能的实现。因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`等等皆是基于 AQS 的。
|
||||||
|
|
||||||
## AQS 原理
|
## AQS 原理
|
||||||
|
|
||||||
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
|
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
|
||||||
|
|
||||||
|
### AQS 快速了解
|
||||||
|
|
||||||
|
在真正讲解 AQS 源码之前,需要对 AQS 有一个整体层面的认识。这里会先通过几个问题,先从整体层面上认识 AQS,了解 AQS 在整个 Java 并发中处于的层面,之后在学习 AQS 源码的过程中,才能更加了解同步器和 AQS 之间的关系。
|
||||||
|
|
||||||
|
- **`AQS` 的作用是什么?**
|
||||||
|
|
||||||
|
如果没有 AQS 的话,想要控制多线程的同步,需要通过 `synchronized` 关键字来完成,而 `synchronized` 又是 JVM 层面的,使用起来不太灵活。
|
||||||
|
|
||||||
|
因此,需要在 Java 语言层面实现一套同步控制器,也就是 AQS(抽象队列同步器)。
|
||||||
|
|
||||||
|
为了控制多线程同步访问共享资源,AQS 内部会提供一个队列,当线程获取不到共享资源时,会进入队列中等待,以此来实现多线程的同步访问。
|
||||||
|
|
||||||
|
- **`AQS` 为什么使用 CLH 锁队列的变体?**
|
||||||
|
|
||||||
|
CLH 锁是基于自旋锁的优化。
|
||||||
|
|
||||||
|
先说一下自旋锁存在的问题:自旋锁指线程不断对一个原子变量执行 `compareAndSet`(简称 `CAS` ) 操作,在并发环境下,多个线程会同时对一个原子变量执行 `CAS` ,就有可能存在某个线程的 `CAS` 操作一直失败,存在 “饥饿” 问题。
|
||||||
|
|
||||||
|
因此 CLH 锁对自旋锁进行了改进。CLH 锁会将并发竞争的线程组织成一个队列,队列中的每个线程节点会不断自旋访问前一个线程节点的状态。通过 CLH 锁形成的队列,就可以将各个竞争的线程进行排队,避免出现 “饥饿” 问题。
|
||||||
|
|
||||||
|
而 AQS 又基于 CLH 锁进一步进行改进,源码作者(Doug Lea)称 AQS 内部的队列为 CLH 锁队列的变体,主要进行了两点改进:
|
||||||
|
|
||||||
|
- 由 **自旋** 优化为 **自旋 + 阻塞** :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 锁队列的变体中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
|
||||||
|
- 由 **单向队列** 优化为 **双向队列** :在 CLH 锁队列的变体中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 `next` 指针,成为了双向队列。
|
||||||
|
|
||||||
|
- **`AQS` 和同步器( `ReentrantLock` 、 `Semaphore` 等)之间的关系是怎样的?**
|
||||||
|
|
||||||
|
AQS 是一个抽象类,为同步器提供了执行框架。 **获取资源的流程** 已经在 AQS 中定义好了,具体如何获取资源则由同步器来实现。
|
||||||
|
|
||||||
|
同步器只需要基于 AQS 重写获取和释放资源的模板方法即可,因此 AQS 是底座,同步器是上层应用。
|
||||||
|
|
||||||
|
- **`AQS` 的性能比较好,原因是什么?**
|
||||||
|
|
||||||
|
因为 AQS 里使用了 `CAS` + `线程阻塞/唤醒` 。
|
||||||
|
|
||||||
|
在 AQS 的实现里,大量使用了 `CAS` 操作,`CAS` 基于内存地址直接进行数据修改,保证并发安全的同时,性能也很好。
|
||||||
|
|
||||||
|
但是如果一直通过 `CAS` 操作来更新数据,会比较占用 CPU。因此 AQS 同时结合了 `CAS` 和 `线程的阻塞/唤醒` 机制,当 `CAS` 没有成功获取资源时,会对线程进行阻塞,避免一直空转占用 CPU 资源。
|
||||||
|
|
||||||
|
- **`AQS` 中为什么 Node 节点需要不同的状态?**
|
||||||
|
|
||||||
|
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
|
||||||
|
|
||||||
|
在 AQS 中,一个节点加入队列之后,初始状态为 `0` 。
|
||||||
|
|
||||||
|
当有新的节点加入队列,此时新节点的前继节点状态就会由 `0` 更新为 `SIGNAL` ,表示前继节点释放锁之后,需要对新节点进行唤醒操作。
|
||||||
|
|
||||||
|
如果一个节点在队列中等待获取锁锁时,因为某种原因失败了,该节点的状态就会变为 `CANCELLED` ,表明取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点。
|
||||||
|
|
||||||
### AQS 核心思想
|
### AQS 核心思想
|
||||||
|
|
||||||
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** (Craig, Landin, and Hagersten locks) 实现的。
|
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** (Craig, Landin, and Hagersten locks) 进一步优化实现的。
|
||||||
|
|
||||||
CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
|
**CLH 锁** 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。**CLH 锁** 的队列结构如下图所示。
|
||||||
|
|
||||||
CLH 队列结构如下图所示:
|

|
||||||
|
|
||||||

|
AQS 中使用的 **等待队列** 是 CLH 锁队列的变体(接下来简称为 CLH 变体队列)。
|
||||||
|
|
||||||
|
AQS 的 CLH 变体队列是一个双向队列,会暂时获取不到锁的线程将被加入到该队列中,CLH 变体队列和原本的 CLH 锁队列的区别主要有两点:
|
||||||
|
|
||||||
|
- 由 **自旋** 优化为 **自旋 + 阻塞** :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
|
||||||
|
- 由 **单向队列** 优化为 **双向队列** :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 `next` 指针,成为了双向队列。
|
||||||
|
|
||||||
|
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点(Node)来实现锁的分配。在 CLH 变体队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
|
||||||
|
|
||||||
|
AQS 中的 CLH 变体队列结构如下图所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
关于 AQS 核心数据结构-CLH 锁的详细解读,强烈推荐阅读 [Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙](https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg) 这篇文章。
|
关于 AQS 核心数据结构-CLH 锁的详细解读,强烈推荐阅读 [Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙](https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg) 这篇文章。
|
||||||
|
|
||||||
AQS(`AbstractQueuedSynchronizer`)的核心原理图:
|
AQS(`AbstractQueuedSynchronizer`)的核心原理图:
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
|
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
|
||||||
|
|
||||||
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
|
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获取情况。
|
||||||
|
|
||||||
```java
|
```java
|
||||||
// 共享变量,使用volatile修饰保证线程可见性
|
// 共享变量,使用volatile修饰保证线程可见性
|
||||||
@ -68,7 +128,7 @@ protected final boolean compareAndSetState(int expect, int update) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
以可重入的互斥锁 `ReentrantLock` 为例,它的内部维护了一个 `state` 变量,用来表示锁的占用状态。`state` 的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 `lock()` 方法时,会尝试通过 `tryAcquire()` 方法独占该锁,并让 `state` 的值加 1。如果成功了,那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(`state` 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 `state` 的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
|
以可重入的互斥锁 `ReentrantLock` 为例,它的内部维护了一个 `state` 变量,用来表示锁的占用状态。`state` 的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 `lock()` 方法时,会尝试通过 `tryAcquire()` 方法独占该锁,并让 `state` 的值加 1。如果成功了,那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 变体队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(`state` 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 `state` 的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
|
||||||
|
|
||||||
线程 A 尝试获取锁的过程如下图所示(图源[从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队](./reentrantlock.md)):
|
线程 A 尝试获取锁的过程如下图所示(图源[从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队](./reentrantlock.md)):
|
||||||
|
|
||||||
@ -76,20 +136,28 @@ protected final boolean compareAndSetState(int expect, int update) {
|
|||||||
|
|
||||||
再以倒计时器 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 `countDown()` 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 `state` 的值减少 1。当所有的子线程都执行完毕后(即 `state` 的值变为 0),`CountDownLatch` 会调用 `unpark()` 方法,唤醒主线程。这时,主线程就可以从 `await()` 方法(`CountDownLatch` 中的`await()` 方法而非 AQS 中的)返回,继续执行后续的操作。
|
再以倒计时器 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 `countDown()` 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 `state` 的值减少 1。当所有的子线程都执行完毕后(即 `state` 的值变为 0),`CountDownLatch` 会调用 `unpark()` 方法,唤醒主线程。这时,主线程就可以从 `await()` 方法(`CountDownLatch` 中的`await()` 方法而非 AQS 中的)返回,继续执行后续的操作。
|
||||||
|
|
||||||
### AQS 资源共享方式
|
### Node 节点 waitStatus 状态含义
|
||||||
|
|
||||||
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
|
| Node 节点状态 | 值 | 含义 |
|
||||||
|
| ------------- | --- | ------------------------------------------------------------------------------------------------------------------------- |
|
||||||
|
| `CANCELLED` | 1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。 |
|
||||||
|
| `SIGNAL` | -1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
|
||||||
|
| `CONDITION` | -2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 `signal()` 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
|
||||||
|
| `PROPAGATE` | -3 | 用于共享模式,在共享模式下,前继节点不仅会唤醒后继节点,同时也可能会唤醒后继节点的后继节点。 |
|
||||||
|
| | 0 | 加入队列的新节点的初始状态。 |
|
||||||
|
|
||||||
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。
|
如果 `waitStatus > 0` ,表明节点的状态已经取消等待获取资源。
|
||||||
|
|
||||||
|
如果 `waitStatus < 0` ,表明节点的处于有效的等待状态。
|
||||||
|
|
||||||
|
因此在 AQS 的源码中,经常使用 `> 0` 、 `< 0` 来对 `waitStatus` 进行判断。
|
||||||
|
|
||||||
### 自定义同步器
|
### 自定义同步器
|
||||||
|
|
||||||
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
|
基于 AQS 可以实现自定义的同步器, AQS 提供了 5 个模板方法(模板方法模式)。如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
|
||||||
|
|
||||||
1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。
|
1. 自定义的同步器继承 `AbstractQueuedSynchronizer` 。
|
||||||
2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
|
2. 重写 AQS 暴露的模板方法。
|
||||||
|
|
||||||
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
|
|
||||||
|
|
||||||
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
|
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
|
||||||
|
|
||||||
@ -112,6 +180,444 @@ protected boolean isHeldExclusively()
|
|||||||
|
|
||||||
除了上面提到的钩子方法之外,AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。
|
除了上面提到的钩子方法之外,AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。
|
||||||
|
|
||||||
|
### AQS 资源共享方式
|
||||||
|
|
||||||
|
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
|
||||||
|
|
||||||
|
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。
|
||||||
|
|
||||||
|
### AQS 资源获取源码分析(独占模式)
|
||||||
|
|
||||||
|
AQS 中以独占模式获取资源的入口方法是 `acquire()` ,如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
public final void acquire(int arg) {
|
||||||
|
if (!tryAcquire(arg) &&
|
||||||
|
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
|
||||||
|
selfInterrupt();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `acquire()` 中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
|
||||||
|
|
||||||
|
- `tryAcquire()` :尝试获取锁(模板方法),`AQS` 不提供具体实现,由子类实现。
|
||||||
|
- `addWaiter()` :如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。
|
||||||
|
- `acquireQueued()` :对线程进行阻塞、唤醒,并调用 `tryAcquire()` 方法让队列中的线程尝试获取锁。
|
||||||
|
|
||||||
|
#### `tryAcquire()` 分析
|
||||||
|
|
||||||
|
AQS 中对应的 `tryAcquire()` 模板方法如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
protected boolean tryAcquire(int arg) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`tryAcquire()` 方法是 AQS 提供的模板方法,不提供默认实现。
|
||||||
|
|
||||||
|
因此,这里分析 `tryAcquire()` 方法时,以 `ReentrantLock` 的非公平锁(独占锁)为例进行分析,`ReentrantLock` 内部实现的 `tryAcquire()` 会调用到下边的 `nonfairTryAcquire()` :
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// ReentrantLock
|
||||||
|
final boolean nonfairTryAcquire(int acquires) {
|
||||||
|
final Thread current = Thread.currentThread();
|
||||||
|
// 1、获取 AQS 中的 state 状态
|
||||||
|
int c = getState();
|
||||||
|
// 2、如果 state 为 0,证明锁没有被其他线程占用
|
||||||
|
if (c == 0) {
|
||||||
|
// 2.1、通过 CAS 对 state 进行更新
|
||||||
|
if (compareAndSetState(0, acquires)) {
|
||||||
|
// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程
|
||||||
|
setExclusiveOwnerThread(current);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 3、如果当前线程和锁的持有线程相同,说明发生了「锁的重入」
|
||||||
|
else if (current == getExclusiveOwnerThread()) {
|
||||||
|
int nextc = c + acquires;
|
||||||
|
if (nextc < 0) // overflow
|
||||||
|
throw new Error("Maximum lock count exceeded");
|
||||||
|
// 3.1、将锁的重入次数加 1
|
||||||
|
setState(nextc);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// 4、如果锁被其他线程占用,就返回 false,表示获取锁失败
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `nonfairTryAcquire()` 方法内部,主要通过两个核心操作去完成资源的获取:
|
||||||
|
|
||||||
|
- 通过 `CAS` 更新 `state` 变量。`state == 0` 表示资源没有被占用。`state > 0` 表示资源被占用,此时 `state` 表示重入次数。
|
||||||
|
- 通过 `setExclusiveOwnerThread()` 设置持有资源的线程。
|
||||||
|
|
||||||
|
如果线程更新 `state` 变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。
|
||||||
|
|
||||||
|
#### `addWaiter()` 分析
|
||||||
|
|
||||||
|
在通过 `tryAcquire()` 方法尝试获取资源失败之后,会调用 `addWaiter()` 方法将当前线程封装为 Node 节点加入 `AQS` 内部的队列中。`addWaite()` 代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
private Node addWaiter(Node mode) {
|
||||||
|
// 1、将当前线程封装为 Node 节点。
|
||||||
|
Node node = new Node(Thread.currentThread(), mode);
|
||||||
|
Node pred = tail;
|
||||||
|
// 2、如果 pred != null,则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。
|
||||||
|
if (pred != null) {
|
||||||
|
node.prev = pred;
|
||||||
|
// 2.1、通过 CAS 控制并发安全。
|
||||||
|
if (compareAndSetTail(pred, node)) {
|
||||||
|
pred.next = node;
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 3、初始化队列,并将新创建的 Node 节点加入队列。
|
||||||
|
enq(node);
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**节点入队的并发安全:**
|
||||||
|
|
||||||
|
在 `addWaiter()` 方法中,需要执行 Node 节点 **入队** 的操作。由于是在多线程环境下,因此需要通过 `CAS` 操作保证并发安全。
|
||||||
|
|
||||||
|
通过 `CAS` 操作去更新 `tail` 指针指向新入队的 Node 节点,`CAS` 可以保证只有一个线程会成功修改 `tail` 指针,以此来保证 Node 节点入队时的并发安全。
|
||||||
|
|
||||||
|
**AQS 内部队列的初始化:**
|
||||||
|
|
||||||
|
在执行 `addWaiter()` 时,如果发现 `pred == null` ,即 `tail` 指针为 null,则证明队列没有初始化,需要调用 `enq()` 方法初始化队列,并将 `Node` 节点加入到初始化后的队列中,代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
private Node enq(final Node node) {
|
||||||
|
for (;;) {
|
||||||
|
Node t = tail;
|
||||||
|
if (t == null) {
|
||||||
|
// 1、通过 CAS 操作保证队列初始化的并发安全
|
||||||
|
if (compareAndSetHead(new Node()))
|
||||||
|
tail = head;
|
||||||
|
} else {
|
||||||
|
// 2、与 addWaiter() 方法中节点入队的操作相同
|
||||||
|
node.prev = t;
|
||||||
|
if (compareAndSetTail(t, node)) {
|
||||||
|
t.next = node;
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `enq()` 方法中初始化队列,在初始化过程中,也需要通过 `CAS` 来保证并发安全。
|
||||||
|
|
||||||
|
初始化队列总共包含两个步骤:初始化 `head` 节点、`tail` 指向 `head` 节点。
|
||||||
|
|
||||||
|
**初始化后的队列如下图所示:**
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
#### `acquireQueued()` 分析
|
||||||
|
|
||||||
|
为了方便阅读,这里再贴一下 `AQS` 中 `acquire()` 获取资源的代码:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
public final void acquire(int arg) {
|
||||||
|
if (!tryAcquire(arg) &&
|
||||||
|
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
|
||||||
|
selfInterrupt();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `acquire()` 方法中,通过 `addWaiter()` 方法将 `Node` 节点加入队列之后,就会调用 `acquireQueued()` 方法。代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS:令队列中的节点尝试获取锁,并且对线程进行阻塞。
|
||||||
|
final boolean acquireQueued(final Node node, int arg) {
|
||||||
|
boolean failed = true;
|
||||||
|
try {
|
||||||
|
boolean interrupted = false;
|
||||||
|
for (;;) {
|
||||||
|
// 1、尝试获取锁。
|
||||||
|
final Node p = node.predecessor();
|
||||||
|
if (p == head && tryAcquire(arg)) {
|
||||||
|
setHead(node);
|
||||||
|
p.next = null; // help GC
|
||||||
|
failed = false;
|
||||||
|
return interrupted;
|
||||||
|
}
|
||||||
|
// 2、判断线程是否可以阻塞,如果可以,则阻塞当前线程。
|
||||||
|
if (shouldParkAfterFailedAcquire(p, node) &&
|
||||||
|
parkAndCheckInterrupt())
|
||||||
|
interrupted = true;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// 3、如果获取锁失败,就会取消获取锁,将节点状态更新为 CANCELLED。
|
||||||
|
if (failed)
|
||||||
|
cancelAcquire(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `acquireQueued()` 方法中,主要做两件事情:
|
||||||
|
|
||||||
|
- **尝试获取资源:** 当前线程加入队列之后,如果发现前继节点是 `head` 节点,说明当前线程是队列中第一个等待的节点,于是调用 `tryAcquire()` 尝试获取资源。
|
||||||
|
|
||||||
|
- **阻塞当前线程** :如果尝试获取资源失败,就需要阻塞当前线程,等待被唤醒之后获取资源。
|
||||||
|
|
||||||
|
**1、尝试获取资源**
|
||||||
|
|
||||||
|
在 `acquireQueued()` 方法中,尝试获取资源总共有 2 个步骤:
|
||||||
|
|
||||||
|
- `p == head` :表明当前节点的前继节点为 `head` 节点。此时当前节点为 AQS 队列中的第一个等待节点。
|
||||||
|
- `tryAcquire(arg) == true` :表明当前线程尝试获取资源成功。
|
||||||
|
|
||||||
|
在成功获取资源之后,就需要将当前线程的节点 **从等待队列中移除** 。移除操作为:将当前等待的线程节点设置为 `head` 节点(`head` 节点是虚拟节点,并不参与排队获取资源)。
|
||||||
|
|
||||||
|
**2、阻塞当前线程**
|
||||||
|
|
||||||
|
在 `AQS` 中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为 `CANCELLED` ,`CANCELLED` 状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过 `CANCELLED` 状态的节点。
|
||||||
|
|
||||||
|
通过 `shouldParkAfterFailedAcquire()` 方法来判断当前线程节点是否可以阻塞,如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS:判断当前线程节点是否可以阻塞。
|
||||||
|
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
|
||||||
|
int ws = pred.waitStatus;
|
||||||
|
// 1、前继节点状态正常,直接返回 true 即可。
|
||||||
|
if (ws == Node.SIGNAL)
|
||||||
|
return true;
|
||||||
|
// 2、ws > 0 表示前继节点的状态异常,即为 CANCELLED 状态,需要跳过异常状态的节点。
|
||||||
|
if (ws > 0) {
|
||||||
|
do {
|
||||||
|
node.prev = pred = pred.prev;
|
||||||
|
} while (pred.waitStatus > 0);
|
||||||
|
pred.next = node;
|
||||||
|
} else {
|
||||||
|
// 3、如果前继节点的状态不是 SIGNAL,也不是 CANCELLED,就将状态设置为 SIGNAL。
|
||||||
|
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`shouldParkAfterFailedAcquire()` 方法中的判断逻辑:
|
||||||
|
|
||||||
|
- 如果发现前继节点的状态是 `SIGNAL` ,则可以阻塞当前线程。
|
||||||
|
- 如果发现前继节点的状态是 `CANCELLED` ,则需要跳过 `CANCELLED` 状态的节点。
|
||||||
|
- 如果发现前继节点的状态不是 `SIGNAL` 和 `CANCELLED` ,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为 `SIGNAL` ,表明该前继节点需要对后续节点进行唤醒。
|
||||||
|
|
||||||
|
当判断当前线程可以阻塞之后,通过调用 `parkAndCheckInterrupt()` 方法来阻塞当前线程。内部使用了 `LockSupport` 来实现阻塞。`LockSupoprt` 底层是基于 `Unsafe` 类来阻塞线程,代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
private final boolean parkAndCheckInterrupt() {
|
||||||
|
// 1、线程阻塞到这里
|
||||||
|
LockSupport.park(this);
|
||||||
|
// 2、线程被唤醒之后,返回线程中断状态
|
||||||
|
return Thread.interrupted();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**为什么在线程被唤醒之后,要返回线程的中断状态呢?**
|
||||||
|
|
||||||
|
在 `parkAndCheckInterrupt()` 方法中,当执行完 `LockSupport.park(this)` ,线程会被阻塞,代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
private final boolean parkAndCheckInterrupt() {
|
||||||
|
LockSupport.park(this);
|
||||||
|
// 线程被唤醒之后,需要返回线程中断状态
|
||||||
|
return Thread.interrupted();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
当线程被唤醒之后,需要执行 `Thread.interrupted()` 来返回线程的中断状态,这是为什么呢?
|
||||||
|
|
||||||
|
这个和线程的中断协作机制有关系,线程被唤醒之后,并不确定是被中断唤醒,还是被 `LockSupport.unpark()` 唤醒,因此需要通过线程的中断状态来判断。
|
||||||
|
|
||||||
|
**在 `acquire()` 方法中,为什么需要调用 `selfInterrupt()` ?**
|
||||||
|
|
||||||
|
`acquire()` 方法代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
public final void acquire(int arg) {
|
||||||
|
if (!tryAcquire(arg) &&
|
||||||
|
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
|
||||||
|
selfInterrupt();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `acquire()` 方法中,当 `if` 语句的条件返回 `true` 后,就会调用 `selfInterrupt()` ,该方法会中断当前线程,为什么需要中断当前线程呢?
|
||||||
|
|
||||||
|
当 `if` 判断为 `true` 时,需要 `tryAcquire()` 返回 `false` ,并且 `acquireQueued()` 返回 `true` 。
|
||||||
|
|
||||||
|
其中 `acquireQueued()` 方法返回的是线程被唤醒之后的 **中断状态** ,通过执行 `Thread.interrupted()` 来返回。该方法在返回中断状态的同时,会清除线程的中断状态。
|
||||||
|
|
||||||
|
因此如果 `if` 判断为 `true` ,表明线程的中断状态为 `true` ,但是调用 `Thread.interrupted()` 之后,线程的中断状态被清除为 `false` ,因此需要重新执行 `selfInterrupt()` 来重新设置线程的中断状态。
|
||||||
|
|
||||||
|
### AQS 资源释放源码分析(独占模式)
|
||||||
|
|
||||||
|
AQS 中以独占模式释放资源的入口方法是 `release()` ,代码如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS
|
||||||
|
public final boolean release(int arg) {
|
||||||
|
// 1、尝试释放锁
|
||||||
|
if (tryRelease(arg)) {
|
||||||
|
Node h = head;
|
||||||
|
// 2、唤醒后继节点
|
||||||
|
if (h != null && h.waitStatus != 0)
|
||||||
|
unparkSuccessor(h);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `release()` 方法中,主要做两件事:尝试释放锁和唤醒后继节点。对应方法如下:
|
||||||
|
|
||||||
|
**1、尝试释放锁**
|
||||||
|
|
||||||
|
通过 `tryRelease()` 方法尝试释放锁,该方法为模板方法,由自定义同步器实现,因此这里仍然以 `ReentrantLock` 为例来讲解。
|
||||||
|
|
||||||
|
`ReentrantLock` 中实现的 `tryRelease()` 方法如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// ReentrantLock
|
||||||
|
protected final boolean tryRelease(int releases) {
|
||||||
|
int c = getState() - releases;
|
||||||
|
// 1、判断持有锁的线程是否为当前线程
|
||||||
|
if (Thread.currentThread() != getExclusiveOwnerThread())
|
||||||
|
throw new IllegalMonitorStateException();
|
||||||
|
boolean free = false;
|
||||||
|
// 2、如果 state 为 0,则表明当前线程已经没有重入次数。因此将 free 更新为 true,表明该线程会释放锁。
|
||||||
|
if (c == 0) {
|
||||||
|
free = true;
|
||||||
|
// 3、更新持有资源的线程为 null
|
||||||
|
setExclusiveOwnerThread(null);
|
||||||
|
}
|
||||||
|
// 4、更新 state 值
|
||||||
|
setState(c);
|
||||||
|
return free;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `tryRelease()` 方法中,会先计算释放锁之后的 `state` 值,判断 `state` 值是否为 0。
|
||||||
|
|
||||||
|
- 如果 `state == 0` ,表明该线程没有重入次数了,更新 `free = true` ,并修改持有资源的线程为 null,表明该线程完全释放这把锁。
|
||||||
|
- 如果 `state != 0` ,表明该线程还存在重入次数,因此不更新 `free` 值,`free` 值为 `false` 表明该线程没有完全释放这把锁。
|
||||||
|
|
||||||
|
之后更新 `state` 值,并返回 `free` 值,`free` 值表明线程是否完全释放锁。
|
||||||
|
|
||||||
|
**2、唤醒后继节点**
|
||||||
|
|
||||||
|
如果 `tryRelease()` 返回 `true` ,表明线程已经没有重入次数了,锁已经被完全释放,因此需要唤醒后继节点。
|
||||||
|
|
||||||
|
在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: `h != null && h.waitStatus != 0` 。这里解释一下为什么要这样判断:
|
||||||
|
|
||||||
|
- `h == null` :表明 `head` 节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。
|
||||||
|
- `h != null && h.waitStatus == 0` :表明头节点刚刚初始化完毕(节点的初始化状态为 0),后继节点线程还没有成功入队,因此不需要对后续节点进行唤醒。(当后继节点入队之后,会将前继节点的状态修改为 `SIGNAL` ,表明需要对后继节点进行唤醒)
|
||||||
|
- `h != null && h.waitStatus != 0` :其中 `waitStatus` 有可能大于 0,也有可能小于 0。其中 `> 0` 表明节点已经取消等待获取资源,`< 0` 表明节点处于正常等待状态。
|
||||||
|
|
||||||
|
接下来进入 `unparkSuccessor()` 方法查看如何唤醒后继节点:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS:这里的入参 node 为队列的头节点(虚拟头节点)
|
||||||
|
private void unparkSuccessor(Node node) {
|
||||||
|
int ws = node.waitStatus;
|
||||||
|
// 1、将头节点的状态进行清除,为后续的唤醒做准备。
|
||||||
|
if (ws < 0)
|
||||||
|
compareAndSetWaitStatus(node, ws, 0);
|
||||||
|
|
||||||
|
Node s = node.next;
|
||||||
|
// 2、如果后继节点异常,则需要从 tail 向前遍历,找到正常状态的节点进行唤醒。
|
||||||
|
if (s == null || s.waitStatus > 0) {
|
||||||
|
s = null;
|
||||||
|
for (Node t = tail; t != null && t != node; t = t.prev)
|
||||||
|
if (t.waitStatus <= 0)
|
||||||
|
s = t;
|
||||||
|
}
|
||||||
|
if (s != null)
|
||||||
|
// 3、唤醒后继节点
|
||||||
|
LockSupport.unpark(s.thread);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `unparkSuccessor()` 中,如果头节点的状态 `< 0` (在正常情况下,只要有后继节点,头节点的状态应该为 `SIGNAL` ,即 -1),表示需要对后继节点进行唤醒,因此这里提前清除头节点的状态标识,将状态修改为 0,表示已经执行了对后续节点唤醒的操作。
|
||||||
|
|
||||||
|
如果 `s == null` 或者 `s.waitStatus > 0` ,表明后继节点异常,此时不能唤醒异常节点,而是要找到正常状态的节点进行唤醒。
|
||||||
|
|
||||||
|
因此需要从 `tail` 指针向前遍历,来找到第一个状态正常(`waitStatus <= 0`)的节点进行唤醒。
|
||||||
|
|
||||||
|
**为什么要从 `tail` 指针向前遍历,而不是从 `head` 指针向后遍历,寻找正常状态的节点呢?**
|
||||||
|
|
||||||
|
遍历的方向和 **节点的入队操作** 有关。入队方法如下:
|
||||||
|
|
||||||
|
```JAVA
|
||||||
|
// AQS:节点入队方法
|
||||||
|
private Node addWaiter(Node mode) {
|
||||||
|
Node node = new Node(Thread.currentThread(), mode);
|
||||||
|
Node pred = tail;
|
||||||
|
if (pred != null) {
|
||||||
|
// 1、先修改 prev 指针。
|
||||||
|
node.prev = pred;
|
||||||
|
if (compareAndSetTail(pred, node)) {
|
||||||
|
// 2、再修改 next 指针。
|
||||||
|
pred.next = node;
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enq(node);
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
在 `addWaiter()` 方法中,`node` 节点入队需要修改 `node.prev` 和 `pred.next` 两个指针,但是这两个操作并不是 **原子操作** ,先修改了 `node.prev` 指针,之后才修改 `pred.next` 指针。
|
||||||
|
|
||||||
|
在极端情况下,可能会出现 `head` 节点的下一个节点状态为 `CANCELLED` ,此时新入队的节点仅更新了 `node.prev` 指针,还未更新 `pred.next` 指针,如下图:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
这样如果从 `head` 指针向后遍历,无法找到新入队的节点,因此需要从 `tail` 指针向前遍历找到新入队的节点。
|
||||||
|
|
||||||
|
### 图解 AQS 工作原理
|
||||||
|
|
||||||
|
至此,AQS 中以独占模式获取资源、释放资源的源码就讲完了。为了对 AQS 的工作原理、节点状态变化有一个更加清晰的认识,接下来会通过画图的方式来了解整个 AQS 的工作原理。
|
||||||
|
|
||||||
|
由于 AQS 是底层同步工具,获取和释放资源的方法并没有提供具体实现,因此这里基于 `ReentrantLock` 来画图进行讲解。
|
||||||
|
|
||||||
|
假设总共有 3 个线程同时获取锁,线程分别为 `T1` 、 `T2` 和 `T3` 。
|
||||||
|
|
||||||
|
此时,假设线程 `T1` 先获取到锁,线程 `T2` 排队等待获取资源。在线程 `T2` 进入队列之前,需要对 AQS 内部队列进行初始化。初始的 `head` 节点状态为 `0` 。初始化后的队列如下图:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
此时,线程 `T2` 尝试获取锁。由于线程 `T1` 持有所,因此线程 `T2` 会进入队列中等待获取锁。同时会将前继节点( `head` 节点)的状态由 `0` 修改为 `SIGNAL` ,表示需要对 `head` 节点的后继节点进行唤醒。此时,AQS 内部队列如下图所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
此时,线程 `T2` 尝试获取锁。由于线程 `T1` 持有所,因此线程 `T2` 会进入队列中等待获取锁。同时会将前继节点(线程 `T2` 节点)的状态由 `0` 修改为 `SIGNAL` ,表示线程 `T2` 节点需要对后继节点进行唤醒。此时,AQS 内部队列如下图所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
此时,假设线程 `T1` 释放锁,会唤醒后继节点 `T2` 。线程 `T2` 被唤醒后获取到锁,并且会从等待队列中退出。
|
||||||
|
|
||||||
|
这里线程 `T2` 节点退出等待队列并不是直接从队列移除,而是令线程 `T2` 节点成为新的 `head` 节点,以此来退出资源获取的等待。此时 AQS 内部队列如下所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
此时,线程 `T2` 释放锁,会唤醒后继节点 `T3` 。线程 `T3` 获取到锁之后,同样也退出等待队列,即将线程 `T3` 节点变为 `head` 节点来退出资源获取的等待。
|
||||||
|
|
||||||
|
此时 AQS 内部队列如下所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
## 常见同步工具类
|
## 常见同步工具类
|
||||||
|
|
||||||
下面介绍几个基于 AQS 的常见同步工具类。
|
下面介绍几个基于 AQS 的常见同步工具类。
|
||||||
@ -407,7 +913,7 @@ protected boolean tryReleaseShared(int releases) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
|
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 变体CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
|
||||||
|
|
||||||
```java
|
```java
|
||||||
// 等待(也可以叫做加锁)
|
// 等待(也可以叫做加锁)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user