diff --git a/docs/java/concurrent/aqs.md b/docs/java/concurrent/aqs.md index 470ecce8..67b52073 100644 --- a/docs/java/concurrent/aqs.md +++ b/docs/java/concurrent/aqs.md @@ -40,7 +40,7 @@ AQS(`AbstractQueuedSynchronizer`)的核心原理图: ![CLH 队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png) -AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/同步队列** 来完成获取资源线程的排队工作。 +AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。 `state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。 @@ -120,9 +120,9 @@ protected boolean isHeldExclusively() `synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。 -Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 +`Semaphore` 的使用简单,我们这里假设有 `N(N>5)` 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 -```java +```java // 初始共享资源数量 final Semaphore semaphore = new Semaphore(5); // 获取1个许可 @@ -158,41 +158,46 @@ public Semaphore(int permits, boolean fair) { `Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。 -调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。 +以无参 `acquire` 方法为例,调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。 -如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入阻塞队列,挂起当前线程。 +如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。 ```java -/** - * 获取1个许可证 - */ +// 获取1个许可证 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } -/** - * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 - */ + +// 获取一个或者多个许可证 +public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); +} +``` + +`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。 + +```java +// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - // 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入阻塞队列,挂起当前线程。 + // 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } -/** - * 共享模式下尝试获取资源(在Semaphore中的资源即许可证): - * 1、获取失败,返回负值 - * 2、共享模式下获取成功,但后续的共享模式获取会失败,返回0 - * 3、共享模式获取成功,随后的共享模式也可能获取成功,返回正值 - */ +``` + +这里再以非公平模式(`NonfairSync`)的为例,看看 `tryAcquireShared` 方法的实现。 + +```java +// 共享模式下尝试获取资源(在Semaphore中的资源即许可证): protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } -/** - * 非公平的共享模式获取许可证,acquires为许可证数量,根据代码上下文可知该值总是为1 - * 注:公平模式的实现会先判断队列中是否有节点在排队,有则直接返回-1,表示获取失败,没有则执行下面的操作 - */ + +// 非公平的共享模式获取许可证 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 当前可用许可证数量 @@ -209,7 +214,7 @@ final int nonfairTryAcquireShared(int acquires) { } ``` -调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。 +以无参 `release` 方法为例,调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入等待队列,挂起线程。 ```java // 释放一个许可证 @@ -217,39 +222,66 @@ public void release() { sync.releaseShared(1); } -// 释放共享锁,同时会唤醒同步队列中的一个线程。 +// 释放一个或者多个许可证 +public void release(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.releaseShared(permits); +} +``` + +`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。 + +```java +// 释放共享锁 +// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。 public final boolean releaseShared(int arg) { //释放共享锁 if (tryReleaseShared(arg)) { - //唤醒同步队列中的一个线程 + //释放当前节点的后置等待节点 doReleaseShared(); return true; } return false; } +``` + +`tryReleaseShared` 方法是`Semaphore` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。 + +```java +// 内部类 Sync 中重写的一个方法 // 尝试释放资源 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); - int next = current + releases; // 可用许可证+1 + // 可用许可证+1 + int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); - if (compareAndSetState(current, next)) // 通过CAS修改 + // CAS修改state的值 + if (compareAndSetState(current, next)) return true; } } ``` +可以看到,上面提到的几个方法底层基本都是通过同步器 `sync` 实现的。`Sync` 是 `CountDownLatch` 的内部类 , 继承了 `AbstractQueuedSynchronizer` ,重写了其中的某些方法。并且,Sync 对应的还有两个子类 `NonfairSync`(对应非公平模式) 和 `FairSync`(对应公平模式)。 + +```java +private static final class Sync extends AbstractQueuedSynchronizer { + // ... +} +static final class NonfairSync extends Sync { + // ... +} +static final class FairSync extends Sync { + // ... +} +``` + #### 实战 ```java -/** - * - * @author Snailclimb - * @date 2018年9月30日 - * @Description: 需要一次性拿一个许可的情况 - */ -public class SemaphoreExample1 { +public class SemaphoreExample { // 请求的数量 private static final int threadCount = 550; @@ -299,7 +331,7 @@ semaphore.release(5);// 释放5个许可 [issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645): -> `Semaphore` 与 `CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state` 为 `permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 +> `Semaphore` 与 `CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state` 为 `permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入等待队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 > 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。 ### CountDownLatch (倒计时器) @@ -312,7 +344,104 @@ semaphore.release(5);// 释放5个许可 #### 原理 -`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。 +`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。这个我们通过 `CountDownLatch` 的构造方法即可看出。 + +```java +public CountDownLatch(int count) { + if (count < 0) throw new IllegalArgumentException("count < 0"); + this.sync = new Sync(count); +} + +private static final class Sync extends AbstractQueuedSynchronizer { + Sync(int count) { + setState(count); + } + //... +} +``` + +当线程调用 `countDown()` 时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当 `state` 为 0 时,表示所有的线程都调用了 `countDown` 方法,那么在 `CountDownLatch` 上等待的线程就会被唤醒并继续执行。 + +```java +public void countDown() { + // Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer + sync.releaseShared(1); +} +``` + +`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。 + +```java +// 释放共享锁 +// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。 +public final boolean releaseShared(int arg) { + //释放共享锁 + if (tryReleaseShared(arg)) { + //释放当前节点的后置等待节点 + doReleaseShared(); + return true; + } + return false; +} +``` + +`tryReleaseShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。 + +```java +// 对 state 进行递减,直到 state 变成 0; +// 只有 count 递减到 0 时,countDown 才会返回 true +protected boolean tryReleaseShared(int releases) { + // 自选检查 state 是否为 0 + for (;;) { + int c = getState(); + // 如果 state 已经是 0 了,直接返回 false + if (c == 0) + return false; + // 对 state 进行递减 + int nextc = c-1; + // CAS 操作更新 state 的值 + if (compareAndSetState(c, nextc)) + return nextc == 0; + } +} +``` + +以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。 + +```java +// 等待(也可以叫做加锁) +public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} +// 带有超时时间的等待 +public boolean await(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); +} +``` + +`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。 + +```java +// 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程 +public final void acquireSharedInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + // 尝试获得锁,获取成功则返回 + if (tryAcquireShared(arg) < 0) + // 获取失败加入等待队列,挂起线程 + doAcquireSharedInterruptibly(arg); +} +``` + +`tryAcquireShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法,其作用就是判断 `state` 的值是否为 0,是的话就返回 1,否则返回 -1。 + +```java +protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; +} +``` #### 实战 @@ -324,30 +453,25 @@ semaphore.release(5);// 释放5个许可 **CountDownLatch 代码示例**: ```java -/** - * - * @author SnailClimb - * @date 2018年10月1日 - * @Description: CountDownLatch 使用方法示例 - */ -public class CountDownLatchExample1 { +public class CountDownLatchExample { // 请求的数量 - private static final int threadCount = 550; + private static final int THREAD_COUNT = 550; public static void main(String[] args) throws InterruptedException { // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) + // 只是测试使用,实际场景请手动赋值线程池参数 ExecutorService threadPool = Executors.newFixedThreadPool(300); - final CountDownLatch countDownLatch = new CountDownLatch(threadCount); - for (int i = 0; i < threadCount; i++) { - final int threadnum = i; - threadPool.execute(() -> {// Lambda 表达式的运用 + final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); + for (int i = 0; i < THREAD_COUNT; i++) { + final int threadNum = i; + threadPool.execute(() -> { try { - test(threadnum); + test(threadNum); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); } finally { - countDownLatch.countDown();// 表示一个请求已经被完成 + // 表示一个请求已经被完成 + countDownLatch.countDown(); } }); @@ -358,12 +482,11 @@ public class CountDownLatchExample1 { } public static void test(int threadnum) throws InterruptedException { - Thread.sleep(1000);// 模拟请求的耗时操作 - System.out.println("threadnum:" + threadnum); - Thread.sleep(1000);// 模拟请求的耗时操作 + Thread.sleep(1000); + System.out.println("threadNum:" + threadnum); + Thread.sleep(1000); } } - ``` 上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行`System.out.println("finish");`。 @@ -521,12 +644,6 @@ public int await() throws InterruptedException, BrokenBarrierException { 示例 1: ```java -/** - * - * @author Snailclimb - * @date 2018年10月1日 - * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 - */ public class CyclicBarrierExample1 { // 请求的数量 private static final int threadCount = 550; @@ -602,12 +719,6 @@ threadnum:6is finish 示例 2: ```java -/** - * - * @author SnailClimb - * @date 2018年10月1日 - * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable - */ public class CyclicBarrierExample2 { // 请求的数量 private static final int threadCount = 550;