1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-07-28 12:22:17 +08:00

Compare commits

...

9 Commits

Author SHA1 Message Date
Guide
efa2641203 [docs update]aqs详解完善 2024-12-20 15:45:13 +08:00
Guide
5519556cac
Merge pull request #2565 from 1020325258/12-19
AQS 内容完善
2024-12-20 13:14:42 +08:00
Guide
4fbb7639fd
Merge pull request #2564 from flying-pig-z/patch-2
fix: 修改《操作系统常见面试题总结(下)》的一个语句错误
2024-12-20 13:13:48 +08:00
11来了
3db5751dae [docs update]AQS 内容完善 2024-12-20 12:46:07 +08:00
11来了
66e624c636 [docs update]完善 AQS 内容 2024-12-19 23:06:39 +08:00
11来了
15f55e4df4 [docs update]完善 AQS 内容 2024-12-19 23:01:04 +08:00
flying pig
2d9afe1960
Update operating-system-basic-questions-02.md
每个进程对应一个页表,也不是应用程序,因为一个应用程序可能有多个进程。
2024-12-19 16:07:55 +08:00
Guide
c9ac61ddee
Merge pull request #2563 from fengjianche/patch-1
Update gossip-protocl.md
2024-12-18 19:57:23 +08:00
fengjianche
4eb9e2aa7f
Update gossip-protocl.md
Redis Cluster 内部的各个节点是通过 PING 命令来互相来感知节点健康状态的。Gossip 主要用在集群中传播各个节点信息,在各个节点实现节点信息之间的“最终一致性”。
2024-12-18 14:00:28 +08:00
4 changed files with 524 additions and 24 deletions

View File

@ -188,7 +188,7 @@ MMU 将虚拟地址翻译为物理地址的主要机制有 3 种:
![单级页表](https://oss.javaguide.cn/github/javaguide/cs-basics/operating-system/page-table.png)
在分页机制下,每个应用程序都会有一个对应的页表。
在分页机制下,每个进程都会有一个对应的页表。
分页机制下的虚拟地址由两部分组成:

View File

@ -53,7 +53,7 @@ Redis Cluster 的节点之间会相互发送多种 Gossip 消息:
![](./images/gossip/redis-cluster-gossip.png)
有了 Redis Cluster 之后,不需要专门部署 Sentinel 集群服务了。Redis Cluster 相当于是内置了 Sentinel 机制Redis Cluster 内部的各个 Redis 节点通过 Gossip 协议互相探测健康状态,在故障时可以自动切换
有了 Redis Cluster 之后,不需要专门部署 Sentinel 集群服务了。Redis Cluster 相当于是内置了 Sentinel 机制Redis Cluster 内部的各个 Redis 节点通过 Gossip 协议共享集群内信息
关于 Redis Cluster 的详细介绍,可以查看这篇文章 [Redis 集群详解(付费)](https://javaguide.cn/database/redis/redis-cluster.html) 。

View File

@ -20,31 +20,88 @@ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchron
}
```
AQS 为构建锁和同步器提供了一些通用功能的实现因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
AQS 为构建锁和同步器提供了一些通用功能的实现因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
## AQS 原理
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
### AQS 快速了解
在真正讲解 AQS 源码之前,需要对 AQS 有一个整体层面的认识。这里会先通过几个问题,从整体层面上认识 AQS了解 AQS 在整个 Java 并发中所位于的层面,之后在学习 AQS 源码的过程中,才能更加了解同步器和 AQS 之间的关系。
#### AQS 的作用是什么?
AQS 解决了开发者在实现同步器时的复杂性问题。它提供了一个通用框架,用于实现各种同步器,例如 **可重入锁**`ReentrantLock`)、**信号量**`Semaphore`)和 **倒计时器**`CountDownLatch`。通过封装底层的线程同步机制AQS 将复杂的线程管理逻辑隐藏起来,使开发者只需专注于具体的同步逻辑。
简单来说AQS 是一个抽象类,为同步器提供了通用的 **执行框架**。它定义了 **资源获取和释放的通用流程**,而具体的资源获取逻辑则由具体同步器通过重写模板方法来实现。 因此,可以将 AQS 看作是同步器的 **基础“底座”**,而同步器则是基于 AQS 实现的 **具体“应用”**
#### AQS 为什么使用 CLH 锁队列的变体?
CLH 锁是一种基于 **自旋锁** 的优化实现。
先说一下自旋锁存在的问题:自旋锁通过线程不断对一个原子变量执行 `compareAndSet`(简称 `CAS`)操作来尝试获取锁。在高并发场景下,多个线程会同时竞争同一个原子变量,容易造成某个线程的 `CAS` 操作长时间失败,从而导致 **“饥饿”问题**(某些线程可能永远无法获取锁)。
CLH 锁通过引入一个队列来组织并发竞争的线程,对自旋锁进行了改进:
- 每个线程会作为一个节点加入到队列中,并通过自旋监控前一个线程节点的状态,而不是直接竞争共享变量。
- 线程按顺序排队,确保公平性,从而避免了 “饥饿” 问题。
AQSAbstractQueuedSynchronizer在 CLH 锁的基础上进一步优化,形成了其内部的 **CLH 队列变体**。主要改进点有以下两方面:
1. **自旋 + 阻塞** CLH 锁使用纯自旋方式等待锁的释放,但大量的自旋操作会占用过多的 CPU 资源。AQS 引入了 **自旋 + 阻塞** 的混合机制:
- 如果线程获取锁失败,会先短暂自旋尝试获取锁;
- 如果仍然失败,则线程会进入阻塞状态,等待被唤醒,从而减少 CPU 的浪费。
2. **单向队列改为双向队列**CLH 锁使用单向队列节点只知道前驱节点的状态而当某个节点释放锁时需要通过队列唤醒后续节点。AQS 将队列改为 **双向队列**,新增了 `next` 指针,使得节点不仅知道前驱节点,也可以直接唤醒后继节点,从而简化了队列操作,提高了唤醒效率。
#### AQS 的性能比较好,原因是什么?
因为 AQS 里使用了 `CAS` + `线程阻塞/唤醒`
在 AQS 的实现里,大量使用了 `CAS` 操作,`CAS` 基于内存地址直接进行数据修改,保证并发安全的同时,性能也很好。
但是如果一直通过 `CAS` 操作来更新数据,会比较占用 CPU。因此 AQS 同时结合了 `CAS``线程的阻塞/唤醒` 机制,当 `CAS` 没有成功获取资源时,会对线程进行阻塞,避免一直空转占用 CPU 资源。
#### AQS 中为什么 Node 节点需要不同的状态?
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
在 AQS 中,一个节点加入队列之后,初始状态为 `0`
当有新的节点加入队列,此时新节点的前继节点状态就会由 `0` 更新为 `SIGNAL` ,表示前继节点释放锁之后,需要对新节点进行唤醒操作。
如果一个节点在队列中等待获取锁锁时,因为某种原因失败了,该节点的状态就会变为 `CANCELLED` ,表明取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点。
### AQS 核心思想
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** Craig, Landin, and Hagersten locks 实现的。
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** Craig, Landin, and Hagersten locks 进一步优化实现的。
CLH 锁是对自旋锁的一种改进是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点Node来实现锁的分配。在 CLH 队列锁中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
**CLH 锁** 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。**CLH 锁** 的队列结构如下图所示
CLH 队列结构如下图所示:
![CLH 锁的队列结构](https://oss.javaguide.cn/github/javaguide/open-source-project/clh-lock-queue-structure.png)
![CLH 队列结构](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure.png)
AQS 中使用的 **等待队列** 是 CLH 锁队列的变体(接下来简称为 CLH 变体队列)。
AQS 的 CLH 变体队列是一个双向队列会暂时获取不到锁的线程将被加入到该队列中CLH 变体队列和原本的 CLH 锁队列的区别主要有两点:
- 由 **自旋** 优化为 **自旋 + 阻塞** :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
- 由 **单向队列** 优化为 **双向队列** :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 `next` 指针,成为了双向队列。
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点Node来实现锁的分配。在 CLH 变体队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
AQS 中的 CLH 变体队列结构如下图所示:
![CLH 变体队列结构](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure-bianti.png)
关于 AQS 核心数据结构-CLH 锁的详细解读,强烈推荐阅读 [Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙](https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg) 这篇文章。
AQS(`AbstractQueuedSynchronizer`)的核心原理图:
![CLH 队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
![CLH 变体队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获情况。
```java
// 共享变量使用volatile修饰保证线程可见性
@ -68,7 +125,7 @@ protected final boolean compareAndSetState(int expect, int update) {
}
```
以可重入的互斥锁 `ReentrantLock` 为例,它的内部维护了一个 `state` 变量,用来表示锁的占用状态。`state` 的初始值为 0表示锁处于未锁定状态。当线程 A 调用 `lock()` 方法时,会尝试通过 `tryAcquire()` 方法独占该锁,并让 `state` 的值加 1。如果成功了那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列CLH 队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 `state` 的值回到 0也就是让锁恢复到未锁定状态。只有这样其他等待的线程才能有机会获取该锁。
以可重入的互斥锁 `ReentrantLock` 为例,它的内部维护了一个 `state` 变量,用来表示锁的占用状态。`state` 的初始值为 0表示锁处于未锁定状态。当线程 A 调用 `lock()` 方法时,会尝试通过 `tryAcquire()` 方法独占该锁,并让 `state` 的值加 1。如果成功了那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列CLH 变体队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 `state` 的值回到 0也就是让锁恢复到未锁定状态。只有这样其他等待的线程才能有机会获取该锁。
线程 A 尝试获取锁的过程如下图所示(图源[从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队](./reentrantlock.md)
@ -76,20 +133,28 @@ protected final boolean compareAndSetState(int expect, int update) {
再以倒计时器 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 `countDown()` 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 `state` 的值减少 1。当所有的子线程都执行完毕后`state` 的值变为 0`CountDownLatch` 会调用 `unpark()` 方法,唤醒主线程。这时,主线程就可以从 `await()` 方法(`CountDownLatch` 中的`await()` 方法而非 AQS 中的)返回,继续执行后续的操作。
### AQS 资源共享方式
### Node 节点 waitStatus 状态含义
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
| Node 节点状态 | 值 | 含义 |
| ------------- | --- | ------------------------------------------------------------------------------------------------------------------------- |
| `CANCELLED` | 1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。 |
| `SIGNAL` | -1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
| `CONDITION` | -2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 `signal()` 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
| `PROPAGATE` | -3 | 用于共享模式,在共享模式下,前继节点不仅会唤醒后继节点,同时也可能会唤醒后继节点的后继节点。 |
| | 0 | 加入队列的新节点的初始状态。 |
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
如果 `waitStatus > 0` ,表明节点的状态已经取消等待获取资源。
如果 `waitStatus < 0` ,表明节点的处于有效的等待状态。
因此在 AQS 的源码中,经常使用 `> 0``< 0` 来对 `waitStatus` 进行判断。
### 自定义同步器
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
基于 AQS 可以实现自定义的同步器, AQS 提供了 5 个模板方法(模板方法模式)。如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。
2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
1. 自定义的同步器继承 `AbstractQueuedSynchronizer`
2. 重写 AQS 暴露的模板方法。
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
@ -112,6 +177,442 @@ protected boolean isHeldExclusively()
除了上面提到的钩子方法之外AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。
### AQS 资源共享方式
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
### AQS 资源获取源码分析(独占模式)
AQS 中以独占模式获取资源的入口方法是 `acquire()` ,如下:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
- `tryAcquire()` :尝试获取锁(模板方法),`AQS` 不提供具体实现,由子类实现。
- `addWaiter()` :如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。
- `acquireQueued()` :对线程进行阻塞、唤醒,并调用 `tryAcquire()` 方法让队列中的线程尝试获取锁。
#### `tryAcquire()` 分析
AQS 中对应的 `tryAcquire()` 模板方法如下:
```JAVA
// AQS
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
```
`tryAcquire()` 方法是 AQS 提供的模板方法,不提供默认实现。
因此,这里分析 `tryAcquire()` 方法时,以 `ReentrantLock` 的非公平锁(独占锁)为例进行分析,`ReentrantLock` 内部实现的 `tryAcquire()` 会调用到下边的 `nonfairTryAcquire()`
```JAVA
// ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 1、获取 AQS 中的 state 状态
int c = getState();
// 2、如果 state 为 0证明锁没有被其他线程占用
if (c == 0) {
// 2.1、通过 CAS 对 state 进行更新
if (compareAndSetState(0, acquires)) {
// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 3、如果当前线程和锁的持有线程相同说明发生了「锁的重入」
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 3.1、将锁的重入次数加 1
setState(nextc);
return true;
}
// 4、如果锁被其他线程占用就返回 false表示获取锁失败
return false;
}
```
`nonfairTryAcquire()` 方法内部,主要通过两个核心操作去完成资源的获取:
- 通过 `CAS` 更新 `state` 变量。`state == 0` 表示资源没有被占用。`state > 0` 表示资源被占用,此时 `state` 表示重入次数。
- 通过 `setExclusiveOwnerThread()` 设置持有资源的线程。
如果线程更新 `state` 变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。
#### `addWaiter()` 分析
在通过 `tryAcquire()` 方法尝试获取资源失败之后,会调用 `addWaiter()` 方法将当前线程封装为 Node 节点加入 `AQS` 内部的队列中。`addWaite()` 代码如下:
```JAVA
// AQS
private Node addWaiter(Node mode) {
// 1、将当前线程封装为 Node 节点。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 2、如果 pred = null则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。
if (pred != null) {
node.prev = pred;
// 2.1、通过 CAS 控制并发安全。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3、初始化队列并将新创建的 Node 节点加入队列。
enq(node);
return node;
}
```
**节点入队的并发安全:**
`addWaiter()` 方法中,需要执行 Node 节点 **入队** 的操作。由于是在多线程环境下,因此需要通过 `CAS` 操作保证并发安全。
通过 `CAS` 操作去更新 `tail` 指针指向新入队的 Node 节点,`CAS` 可以保证只有一个线程会成功修改 `tail` 指针,以此来保证 Node 节点入队时的并发安全。
**AQS 内部队列的初始化:**
在执行 `addWaiter()` 时,如果发现 `pred == null` ,即 `tail` 指针为 null则证明队列没有初始化需要调用 `enq()` 方法初始化队列,并将 `Node` 节点加入到初始化后的队列中,代码如下:
```JAVA
// AQS
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 1、通过 CAS 操作保证队列初始化的并发安全
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2、与 addWaiter() 方法中节点入队的操作相同
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
`enq()` 方法中初始化队列,在初始化过程中,也需要通过 `CAS` 来保证并发安全。
初始化队列总共包含两个步骤:初始化 `head` 节点、`tail` 指向 `head` 节点。
**初始化后的队列如下图所示:**
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure-init.png)
#### `acquireQueued()` 分析
为了方便阅读,这里再贴一下 `AQS``acquire()` 获取资源的代码:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 方法中,通过 `addWaiter()` 方法将 `Node` 节点加入队列之后,就会调用 `acquireQueued()` 方法。代码如下:
```JAVA
// AQS令队列中的节点尝试获取锁并且对线程进行阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1、尝试获取锁。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2、判断线程是否可以阻塞如果可以则阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取锁失败就会取消获取锁将节点状态更新为 CANCELLED。
if (failed)
cancelAcquire(node);
}
}
```
`acquireQueued()` 方法中,主要做两件事情:
- **尝试获取资源:** 当前线程加入队列之后,如果发现前继节点是 `head` 节点,说明当前线程是队列中第一个等待的节点,于是调用 `tryAcquire()` 尝试获取资源。
- **阻塞当前线程** :如果尝试获取资源失败,就需要阻塞当前线程,等待被唤醒之后获取资源。
**1、尝试获取资源**
`acquireQueued()` 方法中,尝试获取资源总共有 2 个步骤:
- `p == head` :表明当前节点的前继节点为 `head` 节点。此时当前节点为 AQS 队列中的第一个等待节点。
- `tryAcquire(arg) == true` :表明当前线程尝试获取资源成功。
在成功获取资源之后,就需要将当前线程的节点 **从等待队列中移除** 。移除操作为:将当前等待的线程节点设置为 `head` 节点(`head` 节点是虚拟节点,并不参与排队获取资源)。
**2、阻塞当前线程**
`AQS` 中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为 `CANCELLED` `CANCELLED` 状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过 `CANCELLED` 状态的节点。
通过 `shouldParkAfterFailedAcquire()` 方法来判断当前线程节点是否可以阻塞,如下:
```JAVA
// AQS判断当前线程节点是否可以阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1、前继节点状态正常直接返回 true 即可。
if (ws == Node.SIGNAL)
return true;
// 2、ws > 0 表示前继节点的状态异常,即为 CANCELLED 状态,需要跳过异常状态的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 3、如果前继节点的状态不是 SIGNAL也不是 CANCELLED就将状态设置为 SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
```
`shouldParkAfterFailedAcquire()` 方法中的判断逻辑:
- 如果发现前继节点的状态是 `SIGNAL` ,则可以阻塞当前线程。
- 如果发现前继节点的状态是 `CANCELLED` ,则需要跳过 `CANCELLED` 状态的节点。
- 如果发现前继节点的状态不是 `SIGNAL``CANCELLED` ,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为 `SIGNAL` ,表明该前继节点需要对后续节点进行唤醒。
当判断当前线程可以阻塞之后,通过调用 `parkAndCheckInterrupt()` 方法来阻塞当前线程。内部使用了 `LockSupport` 来实现阻塞。`LockSupoprt` 底层是基于 `Unsafe` 类来阻塞线程,代码如下:
```JAVA
// AQS
private final boolean parkAndCheckInterrupt() {
// 1、线程阻塞到这里
LockSupport.park(this);
// 2、线程被唤醒之后返回线程中断状态
return Thread.interrupted();
}
```
**为什么在线程被唤醒之后,要返回线程的中断状态呢?**
`parkAndCheckInterrupt()` 方法中,当执行完 `LockSupport.park(this)` ,线程会被阻塞,代码如下:
```JAVA
// AQS
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 线程被唤醒之后,需要返回线程中断状态
return Thread.interrupted();
}
```
当线程被唤醒之后,需要执行 `Thread.interrupted()` 来返回线程的中断状态,这是为什么呢?
这个和线程的中断协作机制有关系,线程被唤醒之后,并不确定是被中断唤醒,还是被 `LockSupport.unpark()` 唤醒,因此需要通过线程的中断状态来判断。
**在 `acquire()` 方法中,为什么需要调用 `selfInterrupt()` **
`acquire()` 方法代码如下:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 方法中,当 `if` 语句的条件返回 `true` 后,就会调用 `selfInterrupt()` ,该方法会中断当前线程,为什么需要中断当前线程呢?
`if` 判断为 `true` 时,需要 `tryAcquire()` 返回 `false` ,并且 `acquireQueued()` 返回 `true`
其中 `acquireQueued()` 方法返回的是线程被唤醒之后的 **中断状态** ,通过执行 `Thread.interrupted()` 来返回。该方法在返回中断状态的同时,会清除线程的中断状态。
因此如果 `if` 判断为 `true` ,表明线程的中断状态为 `true` ,但是调用 `Thread.interrupted()` 之后,线程的中断状态被清除为 `false` ,因此需要重新执行 `selfInterrupt()` 来重新设置线程的中断状态。
### AQS 资源释放源码分析(独占模式)
AQS 中以独占模式释放资源的入口方法是 `release()` ,代码如下:
```JAVA
// AQS
public final boolean release(int arg) {
// 1、尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 2、唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
```
`release()` 方法中,主要做两件事:尝试释放锁和唤醒后继节点。对应方法如下:
**1、尝试释放锁**
通过 `tryRelease()` 方法尝试释放锁,该方法为模板方法,由自定义同步器实现,因此这里仍然以 `ReentrantLock` 为例来讲解。
`ReentrantLock` 中实现的 `tryRelease()` 方法如下:
```JAVA
// ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 1、判断持有锁的线程是否为当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 2、如果 state 为 0则表明当前线程已经没有重入次数。因此将 free 更新为 true表明该线程会释放锁。
if (c == 0) {
free = true;
// 3、更新持有资源的线程为 null
setExclusiveOwnerThread(null);
}
// 4、更新 state 值
setState(c);
return free;
}
```
`tryRelease()` 方法中,会先计算释放锁之后的 `state` 值,判断 `state` 值是否为 0。
- 如果 `state == 0` ,表明该线程没有重入次数了,更新 `free = true` ,并修改持有资源的线程为 null表明该线程完全释放这把锁。
- 如果 `state != 0` ,表明该线程还存在重入次数,因此不更新 `free` 值,`free` 值为 `false` 表明该线程没有完全释放这把锁。
之后更新 `state` 值,并返回 `free` 值,`free` 值表明线程是否完全释放锁。
**2、唤醒后继节点**
如果 `tryRelease()` 返回 `true` ,表明线程已经没有重入次数了,锁已经被完全释放,因此需要唤醒后继节点。
在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: `h != null && h.waitStatus != 0` 。这里解释一下为什么要这样判断:
- `h == null` :表明 `head` 节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。
- `h != null && h.waitStatus == 0` :表明头节点刚刚初始化完毕(节点的初始化状态为 0后继节点线程还没有成功入队因此不需要对后续节点进行唤醒。当后继节点入队之后会将前继节点的状态修改为 `SIGNAL` ,表明需要对后继节点进行唤醒)
- `h != null && h.waitStatus != 0` :其中 `waitStatus` 有可能大于 0也有可能小于 0。其中 `> 0` 表明节点已经取消等待获取资源,`< 0` 表明节点处于正常等待状态。
接下来进入 `unparkSuccessor()` 方法查看如何唤醒后继节点:
```JAVA
// AQS这里的入参 node 为队列的头节点(虚拟头节点)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 1、将头节点的状态进行清除为后续的唤醒做准备。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 2、如果后继节点异常则需要从 tail 向前遍历,找到正常状态的节点进行唤醒。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 3、唤醒后继节点
LockSupport.unpark(s.thread);
}
```
`unparkSuccessor()` 中,如果头节点的状态 `< 0` (在正常情况下,只要有后继节点,头节点的状态应该为 `SIGNAL` ,即 -1表示需要对后继节点进行唤醒因此这里提前清除头节点的状态标识将状态修改为 0表示已经执行了对后续节点唤醒的操作。
如果 `s == null` 或者 `s.waitStatus > 0` ,表明后继节点异常,此时不能唤醒异常节点,而是要找到正常状态的节点进行唤醒。
因此需要从 `tail` 指针向前遍历,来找到第一个状态正常(`waitStatus <= 0`)的节点进行唤醒。
**为什么要从 `tail` 指针向前遍历,而不是从 `head` 指针向后遍历,寻找正常状态的节点呢?**
遍历的方向和 **节点的入队操作** 有关。入队方法如下:
```JAVA
// AQS节点入队方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 1、先修改 prev 指针。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 2、再修改 next 指针。
pred.next = node;
return node;
}
}
enq(node);
return node;
}
```
`addWaiter()` 方法中,`node` 节点入队需要修改 `node.prev``pred.next` 两个指针,但是这两个操作并不是 **原子操作** ,先修改了 `node.prev` 指针,之后才修改 `pred.next` 指针。
在极端情况下,可能会出现 `head` 节点的下一个节点状态为 `CANCELLED` ,此时新入队的节点仅更新了 `node.prev` 指针,还未更新 `pred.next` 指针,如下图:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-addWaiter.png)
这样如果从 `head` 指针向后遍历,无法找到新入队的节点,因此需要从 `tail` 指针向前遍历找到新入队的节点。
### 图解 AQS 工作原理
至此AQS 中以独占模式获取资源、释放资源的源码就讲完了。为了对 AQS 的工作原理、节点状态变化有一个更加清晰的认识,接下来会通过画图的方式来了解整个 AQS 的工作原理。
由于 AQS 是底层同步工具,获取和释放资源的方法并没有提供具体实现,因此这里基于 `ReentrantLock` 来画图进行讲解。
假设总共有 3 个线程尝试获取锁,线程分别为 `T1``T2``T3`
此时,假设线程 `T1` 先获取到锁,线程 `T2` 排队等待获取锁。在线程 `T2` 进入队列之前,需要对 AQS 内部队列进行初始化。`head` 节点在初始化后状态为 `0` 。AQS 内部初始化后的队列如下图:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process.png)
此时,线程 `T2` 尝试获取锁。由于线程 `T1` 持有锁,因此线程 `T2` 会进入队列中等待获取锁。同时会将前继节点( `head` 节点)的状态由 `0` 更新为 `SIGNAL` ,表示需要对 `head` 节点的后继节点进行唤醒。此时AQS 内部队列如下图所示:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-2.png)
此时,线程 `T3` 尝试获取锁。由于线程 `T1` 持有锁,因此线程 `T3` 会进入队列中等待获取锁。同时会将前继节点(线程 `T2` 节点)的状态由 `0` 更新为 `SIGNAL` ,表示线程 `T2` 节点需要对后继节点进行唤醒。此时AQS 内部队列如下图所示:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-3.png)
此时,假设线程 `T1` 释放锁,会唤醒后继节点 `T2` 。线程 `T2` 被唤醒后获取到锁,并且会从等待队列中退出。
这里线程 `T2` 节点退出等待队列并不是直接从队列移除,而是令线程 `T2` 节点成为新的 `head` 节点,以此来退出资源获取的等待。此时 AQS 内部队列如下所示:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-4.png)
此时,假设线程 `T2` 释放锁,会唤醒后继节点 `T3` 。线程 `T3` 获取到锁之后,同样也退出等待队列,即将线程 `T3` 节点变为 `head` 节点来退出资源获取的等待。此时 AQS 内部队列如下所示:
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-5.png)
## 常见同步工具类
下面介绍几个基于 AQS 的常见同步工具类。
@ -407,7 +908,7 @@ protected boolean tryReleaseShared(int releases) {
}
```
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 变体 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
```java
// 等待(也可以叫做加锁)

View File

@ -148,7 +148,7 @@ static class Entry extends WeakReference<ThreadLocal<?>> {
如果想要在异步场景下传递 `ThreadLocal` 值,有两种解决方案:
- `InheritableThreadLocal` `InheritableThreadLocal` 是 JDK1.2 提供的工具,继承自 `ThreadLocal` 。使用 `InheritableThreadLocal` 时,会在创建子线程时,令子线程继承父线程中的 `ThreadLocal` 值,但是无法支持线程池场景下的 `ThreadLocal` 值传递。
- `TransmittableThreadLocal` `TransmittableThreadLocal` (简称 TTL 是阿里巴巴开源的工具`TTL` 可以在线程池的场景下支持 `ThreadLocal` 值传递。
- `TransmittableThreadLocal` `TransmittableThreadLocal` (简称 TTL 是阿里巴巴开源的工具类,继承并加强了`InheritableThreadLocal`类,可以在线程池的场景下支持 `ThreadLocal` 值传递。项目地址:<https://github.com/alibaba/transmittable-thread-local>
#### `InheritableThreadLocal` 原理扩展
@ -203,11 +203,10 @@ TTL 改造的地方有两处:
</dependency>
```
#### 相关应用场景
#### 应用场景
**线上服务压测** 场景下,会使用 `ThreadLocal` 存储压测标记,来区分压测流量和线上真实流量。
如果使用默认的 `ThreadLocal` ,就会导致在异步线程、线程池场景下, `ThreadLocal` 存储的压测标记丢失,从而造成比较严重的后果。
1. **压测流量标记** 在压测场景中,使用 `ThreadLocal` 存储压测标记,用于区分压测流量和真实流量。如果标记丢失,可能导致压测流量被错误地当成线上流量处理。
2. **上下文传递**:在分布式系统中,传递链路追踪信息(如 Trace ID或用户上下文信息。
## 线程池