1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

Update aqs.md

This commit is contained in:
Guide 2023-07-13 09:08:59 +08:00
parent e680941f43
commit f69d399a58

View File

@ -40,7 +40,7 @@ AQS(`AbstractQueuedSynchronizer`)的核心原理图:
![CLH 队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png) ![CLH 队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/同步队列** 来完成获取资源线程的排队工作。 AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。 `state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
@ -120,7 +120,7 @@ protected boolean isHeldExclusively()
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。 `synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。
Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 `Semaphore` 的使用简单,我们这里假设有 `N(N>5)` 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
```java ```java
// 初始共享资源数量 // 初始共享资源数量
@ -158,41 +158,46 @@ public Semaphore(int permits, boolean fair) {
`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。 `Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。 以无参 `acquire` 方法为例,调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。
如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入阻塞队列,挂起当前线程。 如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。
```java ```java
/** // 获取1个许可证
* 获取1个许可证
*/
public void acquire() throws InterruptedException { public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); sync.acquireSharedInterruptibly(1);
} }
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 // 获取一个或者多个许可证
*/ public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
```
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg) public final void acquireSharedInterruptibly(int arg)
throws InterruptedException { throws InterruptedException {
if (Thread.interrupted()) if (Thread.interrupted())
throw new InterruptedException(); throw new InterruptedException();
// 尝试获取许可证arg为获取许可证个数当获取失败时,则创建一个节点加入阻塞队列,挂起当前线程。 // 尝试获取许可证arg为获取许可证个数当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0) if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); doAcquireSharedInterruptibly(arg);
} }
/** ```
* 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
* 1、获取失败返回负值 这里再以非公平模式(`NonfairSync`)的为例,看看 `tryAcquireShared` 方法的实现。
* 2、共享模式下获取成功但后续的共享模式获取会失败返回0
* 3、共享模式获取成功随后的共享模式也可能获取成功返回正值 ```java
*/ // 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
protected int tryAcquireShared(int acquires) { protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires); return nonfairTryAcquireShared(acquires);
} }
/**
* 非公平的共享模式获取许可证acquires为许可证数量根据代码上下文可知该值总是为1 // 非公平的共享模式获取许可证
* 注:公平模式的实现会先判断队列中是否有节点在排队,有则直接返回-1表示获取失败没有则执行下面的操作
*/
final int nonfairTryAcquireShared(int acquires) { final int nonfairTryAcquireShared(int acquires) {
for (;;) { for (;;) {
// 当前可用许可证数量 // 当前可用许可证数量
@ -209,7 +214,7 @@ final int nonfairTryAcquireShared(int acquires) {
} }
``` ```
调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。 以无参 `release` 方法为例,调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入等待队列,挂起线程。
```java ```java
// 释放一个许可证 // 释放一个许可证
@ -217,39 +222,66 @@ public void release() {
sync.releaseShared(1); sync.releaseShared(1);
} }
// 释放共享锁,同时会唤醒同步队列中的一个线程。 // 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
```
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 释放共享锁
// 如果 tryReleaseShared 返回 true就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) { public final boolean releaseShared(int arg) {
//释放共享锁 //释放共享锁
if (tryReleaseShared(arg)) { if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程 //释放当前节点的后置等待节点
doReleaseShared(); doReleaseShared();
return true; return true;
} }
return false; return false;
} }
```
`tryReleaseShared` 方法是`Semaphore` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
```java
// 内部类 Sync 中重写的一个方法
// 尝试释放资源 // 尝试释放资源
protected final boolean tryReleaseShared(int releases) { protected final boolean tryReleaseShared(int releases) {
for (;;) { for (;;) {
int current = getState(); int current = getState();
int next = current + releases; // 可用许可证+1 // 可用许可证+1
int next = current + releases;
if (next < current) // overflow if (next < current) // overflow
throw new Error("Maximum permit count exceeded"); throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 通过CAS修改 // CAS修改state的值
if (compareAndSetState(current, next))
return true; return true;
} }
} }
``` ```
可以看到,上面提到的几个方法底层基本都是通过同步器 `sync` 实现的。`Sync``CountDownLatch` 的内部类 , 继承了 `AbstractQueuedSynchronizer` 重写了其中的某些方法。并且Sync 对应的还有两个子类 `NonfairSync`(对应非公平模式) 和 `FairSync`(对应公平模式)。
```java
private static final class Sync extends AbstractQueuedSynchronizer {
// ...
}
static final class NonfairSync extends Sync {
// ...
}
static final class FairSync extends Sync {
// ...
}
```
#### 实战 #### 实战
```java ```java
/** public class SemaphoreExample {
*
* @author Snailclimb
* @date 2018年9月30日
* @Description: 需要一次性拿一个许可的情况
*/
public class SemaphoreExample1 {
// 请求的数量 // 请求的数量
private static final int threadCount = 550; private static final int threadCount = 550;
@ -299,7 +331,7 @@ semaphore.release(5);// 释放5个许可
[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645) [issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645)
> `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state``permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1那么自旋的线程便会判断成功。 > `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state``permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入等待队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1那么自旋的线程便会判断成功。
> 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。 > 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。
### CountDownLatch (倒计时器) ### CountDownLatch (倒计时器)
@ -312,7 +344,104 @@ semaphore.release(5);// 释放5个许可
#### 原理 #### 原理
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。 `CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。这个我们通过 `CountDownLatch` 的构造方法即可看出。
```java
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
//...
}
```
当线程调用 `countDown()` 时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当 `state` 为 0 时,表示所有的线程都调用了 `countDown` 方法,那么在 `CountDownLatch` 上等待的线程就会被唤醒并继续执行。
```java
public void countDown() {
// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
sync.releaseShared(1);
}
```
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 释放共享锁
// 如果 tryReleaseShared 返回 true就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}
```
`tryReleaseShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
```java
// 对 state 进行递减,直到 state 变成 0
// 只有 count 递减到 0 时countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {
// 自选检查 state 是否为 0
for (;;) {
int c = getState();
// 如果 state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
// CAS 操作更新 state 的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
```
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
```java
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
```
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获得锁,获取成功则返回
if (tryAcquireShared(arg) < 0)
// 获取失败加入等待队列,挂起线程
doAcquireSharedInterruptibly(arg);
}
```
`tryAcquireShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法,其作用就是判断 `state` 的值是否为 0是的话就返回 1否则返回 -1。
```java
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
```
#### 实战 #### 实战
@ -324,30 +453,25 @@ semaphore.release(5);// 释放5个许可
**CountDownLatch 代码示例** **CountDownLatch 代码示例**
```java ```java
/** public class CountDownLatchExample {
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: CountDownLatch 使用方法示例
*/
public class CountDownLatchExample1 {
// 请求的数量 // 请求的数量
private static final int threadCount = 550; private static final int THREAD_COUNT = 550;
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
// 只是测试使用,实际场景请手动赋值线程池参数
ExecutorService threadPool = Executors.newFixedThreadPool(300); ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount); final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < THREAD_COUNT; i++) {
final int threadnum = i; final int threadNum = i;
threadPool.execute(() -> {// Lambda 表达式的运用 threadPool.execute(() -> {
try { try {
test(threadnum); test(threadNum);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
countDownLatch.countDown();// 表示一个请求已经被完成 // 表示一个请求已经被完成
countDownLatch.countDown();
} }
}); });
@ -358,12 +482,11 @@ public class CountDownLatchExample1 {
} }
public static void test(int threadnum) throws InterruptedException { public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作 Thread.sleep(1000);
System.out.println("threadnum:" + threadnum); System.out.println("threadNum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作 Thread.sleep(1000);
} }
} }
``` ```
上面的代码中,我们定义了请求的数量为 550当这 550 个请求被处理完成之后,才会执行`System.out.println("finish");` 上面的代码中,我们定义了请求的数量为 550当这 550 个请求被处理完成之后,才会执行`System.out.println("finish");`
@ -521,12 +644,6 @@ public int await() throws InterruptedException, BrokenBarrierException {
示例 1 示例 1
```java ```java
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample1 { public class CyclicBarrierExample1 {
// 请求的数量 // 请求的数量
private static final int threadCount = 550; private static final int threadCount = 550;
@ -602,12 +719,6 @@ threadnum:6is finish
示例 2 示例 2
```java ```java
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
*/
public class CyclicBarrierExample2 { public class CyclicBarrierExample2 {
// 请求的数量 // 请求的数量
private static final int threadCount = 550; private static final int threadCount = 550;