From 696b6d6105b82249ddd642712bb1d63ab7c99ee2 Mon Sep 17 00:00:00 2001 From: guide Date: Thu, 15 Dec 2022 22:02:20 +0800 Subject: [PATCH] =?UTF-8?q?[docs=20update]=E7=B2=BE=E7=AE=80=E5=AE=8C?= =?UTF-8?q?=E5=96=84=20aqs=20=E7=9A=84=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/database/redis/redis-questions-02.md | 8 +- docs/java/concurrent/aqs.md | 677 ++++++++---------- .../java-concurrent-questions-03.md | 4 +- .../advantages&disadvantages-of-jwt.md | 3 +- 4 files changed, 305 insertions(+), 387 deletions(-) diff --git a/docs/database/redis/redis-questions-02.md b/docs/database/redis/redis-questions-02.md index 43e04bdc..37a6cb53 100644 --- a/docs/database/redis/redis-questions-02.md +++ b/docs/database/redis/redis-questions-02.md @@ -303,8 +303,6 @@ _为什么会出现误判的情况呢? 我们还要从布隆过滤器的原理 #### 什么是缓存击穿? -缓存穿透中,请求的 key 既不存在于缓存中,也不存在于数据库中。 - 缓存击穿中,请求的 key 对应的是 **热点数据** ,该数据 **存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期)** 。这就可能会导致瞬时大量的请求直接打到了数据库上,对数据库造成了巨大的压力,可能直接就被这么多请求弄宕机了。 ![缓存击穿](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/github/javaguide/database/redis/redis-cache-breakdown.png) @@ -317,6 +315,12 @@ _为什么会出现误判的情况呢? 我们还要从布隆过滤器的原理 - 针对热点数据提前预热,将其存入缓存中并设置合理的过期时间比如秒杀场景下的数据在秒杀结束之前不过期。 - 请求数据库写数据到缓存之前,先获取互斥锁,保证只有一个请求会落到数据库上,减少数据库的压力。 +#### 缓存穿透和缓存击穿有什么区别? + +缓存穿透中,请求的 key 既不存在于缓存中,也不存在于数据库中。 + +缓存击穿中,请求的 key 对应的是 **热点数据** ,该数据 **存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期)** 。 + ### 缓存雪崩 #### 什么是缓存雪崩? diff --git a/docs/java/concurrent/aqs.md b/docs/java/concurrent/aqs.md index be4c5279..593296f1 100644 --- a/docs/java/concurrent/aqs.md +++ b/docs/java/concurrent/aqs.md @@ -5,17 +5,7 @@ tag: - Java并发 --- - -开始之前,先来看几道常见的面试题!建议你带着这些问题来看这篇文章: - -- 何为 AQS?AQS 原理了解吗? -- `CountDownLatch` 和 `CyclicBarrier` 了解吗?两者的区别是什么? -- 用过 `Semaphore` 吗?应用场景了解吗? -- ...... - -相关阅读:[从 ReentrantLock 的实现看AQS的原理及应用](./reentrantlock.md) - -## AQS 简单介绍 +## AQS 介绍 AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面。 @@ -28,180 +18,69 @@ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchron } ``` -AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`,`FutureTask`(jdk1.7) 等等皆是基于 AQS 的。 +AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`等等皆是基于 AQS 的。 ## AQS 原理 -> 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。 +> 👍推荐阅读:[从 ReentrantLock 的实现看 AQS 的原理及应用](./reentrantlock.md) -下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。 +在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。 -### AQS 原理概览 +### AQS 核心思想 -AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁**实现的,即将暂时获取不到锁的线程加入到队列中。 +AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁** 实现的,即将暂时获取不到锁的线程加入到队列中。 -> CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。 +CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。在 CLH 同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。 -看个 AQS(`AbstractQueuedSynchronizer`)原理图: +CLH 队列结构如下图所示: -![enter image description here](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png) +![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/40cb932a64694262993907ebda6a0bfe~tplv-k3u1fbpfcp-zoom-1.image) -AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。 +AQS(`AbstractQueuedSynchronizer`)的核心原理图(图源[Java 并发之 AQS 详解](https://www.cnblogs.com/waterystone/p/4920797.html))如下: + +![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png) + +AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **线程等待队列** 来完成获取资源线程的排队工作。 + +`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。 ```java -private volatile int state;//共享变量,使用volatile修饰保证线程可见性 +// 共享变量,使用volatile修饰保证线程可见性 +private volatile int state; ``` -状态信息通过 `protected` 类型的`getState()`,`setState()`,`compareAndSetState()` 进行操作 +另外,状态信息 `state` 可以通过 `protected` 类型的`getState()`、`setState()`和`compareAndSetState()` 进行操作。并且,这几个方法都是 `final` 修饰的,在子类中无法被重写。 ```java //返回同步状态的当前值 protected final int getState() { - return state; + return state; } // 设置同步状态的值 protected final void setState(int newState) { - state = newState; + state = newState; } //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值) protected final boolean compareAndSetState(int expect, int update) { - return unsafe.compareAndSwapInt(this, stateOffset, expect, update); + return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ``` -### AQS 对资源的共享方式 +以 `ReentrantLock` 为例,`state` 初始值为 0,表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()` 到 `state=`0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(`state` 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。 -AQS 定义两种资源共享方式 +再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。 -**1)Exclusive**(独占) +### AQS 资源共享方式 -只有一个线程能执行,如 `ReentrantLock`。又可分为公平锁和非公平锁,`ReentrantLock` 同时支持两种锁,下面以 `ReentrantLock` 对这两种锁的定义做介绍: +AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。 -- **公平锁** :按照线程在队列中的排队顺序,先到者先拿到锁 -- **非公平锁** :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。 +一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。 -> 说明:下面这部分关于 `ReentrantLock` 源代码内容节选自:https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,这是一篇很不错文章,推荐阅读。 - -**下面来看 `ReentrantLock` 中相关的源代码:** - -`ReentrantLock` 默认采用非公平锁,因为考虑获得更好的性能,通过 `boolean` 来决定是否用公平锁(传入 true 用公平锁)。 - -```java -/** Synchronizer providing all implementation mechanics */ -private final Sync sync; -public ReentrantLock() { - // 默认非公平锁 - sync = new NonfairSync(); -} -public ReentrantLock(boolean fair) { - sync = fair ? new FairSync() : new NonfairSync(); -} -``` - -`ReentrantLock` 中公平锁的 `lock` 方法 - -```java -static final class FairSync extends Sync { - final void lock() { - acquire(1); - } - // AbstractQueuedSynchronizer.acquire(int arg) - public final void acquire(int arg) { - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); - } - protected final boolean tryAcquire(int acquires) { - final Thread current = Thread.currentThread(); - int c = getState(); - if (c == 0) { - // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待 - if (!hasQueuedPredecessors() && - compareAndSetState(0, acquires)) { - setExclusiveOwnerThread(current); - return true; - } - } - else if (current == getExclusiveOwnerThread()) { - int nextc = c + acquires; - if (nextc < 0) - throw new Error("Maximum lock count exceeded"); - setState(nextc); - return true; - } - return false; - } -} -``` - -非公平锁的 `lock` 方法: - -```java -static final class NonfairSync extends Sync { - final void lock() { - // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了 - if (compareAndSetState(0, 1)) - setExclusiveOwnerThread(Thread.currentThread()); - else - acquire(1); - } - // AbstractQueuedSynchronizer.acquire(int arg) - public final void acquire(int arg) { - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); - } - protected final boolean tryAcquire(int acquires) { - return nonfairTryAcquire(acquires); - } -} -/** - * Performs non-fair tryLock. tryAcquire is implemented in - * subclasses, but both need nonfair try for trylock method. - */ -final boolean nonfairTryAcquire(int acquires) { - final Thread current = Thread.currentThread(); - int c = getState(); - if (c == 0) { - // 这里没有对阻塞队列进行判断 - if (compareAndSetState(0, acquires)) { - setExclusiveOwnerThread(current); - return true; - } - } - else if (current == getExclusiveOwnerThread()) { - int nextc = c + acquires; - if (nextc < 0) // overflow - throw new Error("Maximum lock count exceeded"); - setState(nextc); - return true; - } - return false; -} -``` - -总结:公平锁和非公平锁只有两处不同: - -1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。 -2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 `tryAcquire` 方法,在 `tryAcquire` 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。 - -公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。 - -相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。 - -**2)Share**(共享) - -多个线程可同时执行,如 `Semaphore/CountDownLatch`。`Semaphore`、`CountDownLatch`、 `CyclicBarrier`、`ReadWriteLock` 我们都会在后面讲到。 - -`ReentrantReadWriteLock` 可以看成是组合式,因为 `ReentrantReadWriteLock` 也就是读写锁允许多个线程同时对某一资源进行读。 - -不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。 - -### AQS 底层使用了模板方法模式 +### 自定义同步器 同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用): -1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放) +1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。 2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。 这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。 @@ -209,35 +88,102 @@ final boolean nonfairTryAcquire(int acquires) { **AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:** ```java -protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 -protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 -protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。 -protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 +//独占方式。尝试获取资源,成功则返回true,失败则返回false。 +protected boolean tryAcquire(int) +//独占方式。尝试释放资源,成功则返回true,失败则返回false。 +protected boolean tryRelease(int) +//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 +protected int tryAcquireShared(int) +//共享方式。尝试释放资源,成功则返回true,失败则返回false。 +protected boolean tryReleaseShared(int) +//该线程是否正在独占资源。只有用到condition才需要去实现它。 +protected boolean isHeldExclusively() ``` **什么是钩子方法呢?** 钩子方法是一种被声明在抽象类中的方法,一般使用 `protected` 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。 -篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:[用Java8 改造后的模板方法模式真的是 yyds!](https://mp.weixin.qq.com/s/zpScSCktFpnSWHWIQem2jg)。 +篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:[用 Java8 改造后的模板方法模式真的是 yyds!](https://mp.weixin.qq.com/s/zpScSCktFpnSWHWIQem2jg)。 除了上面提到的钩子方法之外,AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。 -以 `ReentrantLock` 为例,state 初始化为 0,表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()` 到 `state=`0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。 +## 常见同步工具类 -再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后` countDown()` 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。 +下面介绍几个基于 AQS 的常见同步工具类。 -一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。 +### Semaphore(信号量) -推荐两篇 AQS 原理和相关源码分析的文章: - -- [Java并发之AQS详解](https://www.cnblogs.com/waterystone/p/4920797.html) -- [Java并发包基石-AQS详解](https://www.cnblogs.com/chengxiao/p/7141160.html) - -## Semaphore(信号量) +#### 介绍 `synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。 -示例代码如下: +`Semaphore` 有两种模式:。 + +- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO; +- **非公平模式:** 抢占式的。 + +`Semaphore` 对应的两个构造方法如下: + +```java +public Semaphore(int permits) { + sync = new NonfairSync(permits); +} + +public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); +} +``` + +**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。** + +`Semaphore` 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。 + +#### 原理 + +`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。 + +调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state >= 0` 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果 `state<0` 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。 + +```java +/** + * 获取1个许可证 + */ +public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} +/** + * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 + */ +public final void acquireSharedInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + // 尝试获取许可证,arg为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。 + if (tryAcquireShared(arg) < 0) + doAcquireSharedInterruptibly(arg); +} +``` + +调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state>=0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。 + +```java +// 释放一个许可证 +public void release() { + sync.releaseShared(1); +} + +// 释放共享锁,同时会唤醒同步队列中的一个线程。 +public final boolean releaseShared(int arg) { + //释放共享锁 + if (tryReleaseShared(arg)) { + //唤醒同步队列中的一个线程 + doReleaseShared(); + return true; + } + return false; +} +``` + +#### 实战 ```java /** @@ -294,45 +240,31 @@ semaphore.release(5);// 释放5个许可 除了 `acquire()` 方法之外,另一个比较常用的与之对应的方法是 `tryAcquire()` 方法,该方法如果获取不到许可就立即返回 false。 -`Semaphore` 有两种模式,公平模式和非公平模式。 +[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645) : -- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO; -- **非公平模式:** 抢占式的。 +> `Semaphore` 与 `CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state` 为 `permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 +> 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。 -`Semaphore` 对应的两个构造方法如下: +### CountDownLatch (倒计时器) -```java - public Semaphore(int permits) { - sync = new NonfairSync(permits); - } - - public Semaphore(int permits, boolean fair) { - sync = fair ? new FairSync(permits) : new NonfairSync(permits); - } -``` - -**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。** - -[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645) :`Semaphore` 与 `CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 `permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 -如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。 - -## CountDownLatch (倒计时器) +#### 介绍 `CountDownLatch` 允许 `count` 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。 +`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。 + +#### 原理 + `CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。 -### CountDownLatch 的两种典型用法 +#### 实战 -**1、某一线程在开始运行前等待 n 个线程执行完毕。** +**CountDownLatch 的两种典型用法** : -将 `CountDownLatch` 的计数器初始化为 n (`new CountDownLatch(n)`),每当一个任务线程执行完毕,就将计数器减 1 (`countdownlatch.countDown()`),当计数器的值变为 0 时,在 `CountDownLatch 上 await()` 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。 +1. 某一线程在开始运行前等待 n 个线程执行完毕 : 将 `CountDownLatch` 的计数器初始化为 n (`new CountDownLatch(n)`),每当一个任务线程执行完毕,就将计数器减 1 (`countdownlatch.countDown()`),当计数器的值变为 0 时,在 `CountDownLatch 上 await()` 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。 +2. 实现多个线程开始执行任务的最大并行性 :注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 `CountDownLatch` 对象,将其计数器初始化为 1 (`new CountDownLatch(1)`),多个线程在开始执行任务前首先 `coundownlatch.await()`,当主线程调用 `countDown()` 时,计数器变为 0,多个线程同时被唤醒。 -**2、实现多个线程开始执行任务的最大并行性。** - -注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 `CountDownLatch` 对象,将其计数器初始化为 1 (`new CountDownLatch(1)`),多个线程在开始执行任务前首先 `coundownlatch.await()`,当主线程调用 `countDown()` 时,计数器变为 0,多个线程同时被唤醒。 - -### CountDownLatch 的使用示例 +**CountDownLatch 代码示例** : ```java /** @@ -393,17 +325,9 @@ for (int i = 0; i < threadCount-1; i++) { 这样就导致 `count` 的值没办法等于 0,然后就会导致一直等待。 -### CountDownLatch 的不足 +### CyclicBarrier(循环栅栏) -`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。 - -### CountDownLatch 相常见面试题 - -- `CountDownLatch` 怎么用?应用场景是什么? -- `CountDownLatch` 和 `CyclicBarrier` 的不同之处? -- `CountDownLatch` 类中主要的方法? - -## CyclicBarrier(循环栅栏) +#### 介绍 `CyclicBarrier` 和 `CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。 @@ -411,9 +335,20 @@ for (int i = 0; i < threadCount-1; i++) { `CyclicBarrier` 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。 -`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。 +#### 原理 -再来看一下它的构造函数: +`CyclicBarrier` 内部通过一个 `count` 变量作为计数器,`count` 的初始值为 `parties` 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。 + +```java +//每次拦截的线程数 +private final int parties; +//计数器 +private int count; +``` + +下面我们结合源码来简单看看。 + +1、`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。 ```java public CyclicBarrier(int parties) { @@ -428,172 +363,9 @@ public CyclicBarrier(int parties, Runnable barrierAction) { } ``` -其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。 +其中,`parties` 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。 -### CyclicBarrier 的应用场景 - -`CyclicBarrier` 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。 - -### CyclicBarrier 的使用示例 - -示例 1: - -```java -/** - * - * @author Snailclimb - * @date 2018年10月1日 - * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 - */ -public class CyclicBarrierExample2 { - // 请求的数量 - private static final int threadCount = 550; - // 需要同步的线程数量 - private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); - - public static void main(String[] args) throws InterruptedException { - // 创建线程池 - ExecutorService threadPool = Executors.newFixedThreadPool(10); - - for (int i = 0; i < threadCount; i++) { - final int threadNum = i; - Thread.sleep(1000); - threadPool.execute(() -> { - try { - test(threadNum); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (BrokenBarrierException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); - } - threadPool.shutdown(); - } - - public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { - System.out.println("threadnum:" + threadnum + "is ready"); - try { - /**等待60秒,保证子线程完全执行结束*/ - cyclicBarrier.await(60, TimeUnit.SECONDS); - } catch (Exception e) { - System.out.println("-----CyclicBarrierException------"); - } - System.out.println("threadnum:" + threadnum + "is finish"); - } - -} -``` - -运行结果,如下: - -``` -threadnum:0is ready -threadnum:1is ready -threadnum:2is ready -threadnum:3is ready -threadnum:4is ready -threadnum:4is finish -threadnum:0is finish -threadnum:1is finish -threadnum:2is finish -threadnum:3is finish -threadnum:5is ready -threadnum:6is ready -threadnum:7is ready -threadnum:8is ready -threadnum:9is ready -threadnum:9is finish -threadnum:5is finish -threadnum:8is finish -threadnum:7is finish -threadnum:6is finish -...... -``` - -可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, `await()` 方法之后的方法才被执行。 - -另外,`CyclicBarrier` 还提供一个更高级的构造函数 `CyclicBarrier(int parties, Runnable barrierAction)`,用于在线程到达屏障时,优先执行 `barrierAction`,方便处理更复杂的业务场景。示例代码如下: - -```java -/** - * - * @author SnailClimb - * @date 2018年10月1日 - * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable - */ -public class CyclicBarrierExample3 { - // 请求的数量 - private static final int threadCount = 550; - // 需要同步的线程数量 - private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { - System.out.println("------当线程数达到之后,优先执行------"); - }); - - public static void main(String[] args) throws InterruptedException { - // 创建线程池 - ExecutorService threadPool = Executors.newFixedThreadPool(10); - - for (int i = 0; i < threadCount; i++) { - final int threadNum = i; - Thread.sleep(1000); - threadPool.execute(() -> { - try { - test(threadNum); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (BrokenBarrierException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }); - } - threadPool.shutdown(); - } - - public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { - System.out.println("threadnum:" + threadnum + "is ready"); - cyclicBarrier.await(); - System.out.println("threadnum:" + threadnum + "is finish"); - } - -} -``` - -运行结果,如下: - -``` -threadnum:0is ready -threadnum:1is ready -threadnum:2is ready -threadnum:3is ready -threadnum:4is ready -------当线程数达到之后,优先执行------ -threadnum:4is finish -threadnum:0is finish -threadnum:2is finish -threadnum:1is finish -threadnum:3is finish -threadnum:5is ready -threadnum:6is ready -threadnum:7is ready -threadnum:8is ready -threadnum:9is ready -------当线程数达到之后,优先执行------ -threadnum:9is finish -threadnum:5is finish -threadnum:6is finish -threadnum:8is finish -threadnum:7is finish -...... -``` - -### CyclicBarrier 源码分析 - -当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。 +2、当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。 ```java public int await() throws InterruptedException, BrokenBarrierException { @@ -605,7 +377,7 @@ public int await() throws InterruptedException, BrokenBarrierException { } ``` -`dowait(false, 0L)`: +`dowait(false, 0L)`方法源码分析如下: ```java // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。 @@ -685,24 +457,163 @@ public int await() throws InterruptedException, BrokenBarrierException { lock.unlock(); } } - ``` -总结:`CyclicBarrier` 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。 +#### 实战 -### CyclicBarrier 和 CountDownLatch 的区别 +示例 1: -下面这个是国外一个大佬的回答: +```java +/** + * + * @author Snailclimb + * @date 2018年10月1日 + * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法 + */ +public class CyclicBarrierExample1 { + // 请求的数量 + private static final int threadCount = 550; + // 需要同步的线程数量 + private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); -`CountDownLatch` 是计数器,只能使用一次,而 `CyclicBarrier` 的计数器提供 `reset` 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的: + public static void main(String[] args) throws InterruptedException { + // 创建线程池 + ExecutorService threadPool = Executors.newFixedThreadPool(10); -> CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) -> CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。) + for (int i = 0; i < threadCount; i++) { + final int threadNum = i; + Thread.sleep(1000); + threadPool.execute(() -> { + try { + test(threadNum); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + } + threadPool.shutdown(); + } -对于 `CountDownLatch` 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 `CyclicBarrier`,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。 + public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { + System.out.println("threadnum:" + threadnum + "is ready"); + try { + /**等待60秒,保证子线程完全执行结束*/ + cyclicBarrier.await(60, TimeUnit.SECONDS); + } catch (Exception e) { + System.out.println("-----CyclicBarrierException------"); + } + System.out.println("threadnum:" + threadnum + "is finish"); + } -`CountDownLatch` 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 `CyclicBarrier` 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。 +} +``` -### ReentrantLock 和 ReentrantReadWriteLock +运行结果,如下: -`ReentrantLock` 和 `synchronized` 的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 `ReentrantReadWriteLock` 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。 +``` +threadnum:0is ready +threadnum:1is ready +threadnum:2is ready +threadnum:3is ready +threadnum:4is ready +threadnum:4is finish +threadnum:0is finish +threadnum:1is finish +threadnum:2is finish +threadnum:3is finish +threadnum:5is ready +threadnum:6is ready +threadnum:7is ready +threadnum:8is ready +threadnum:9is ready +threadnum:9is finish +threadnum:5is finish +threadnum:8is finish +threadnum:7is finish +threadnum:6is finish +...... +``` + +可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, `await()` 方法之后的方法才被执行。 + +另外,`CyclicBarrier` 还提供一个更高级的构造函数 `CyclicBarrier(int parties, Runnable barrierAction)`,用于在线程到达屏障时,优先执行 `barrierAction`,方便处理更复杂的业务场景。 + +示例 2: + +```java +/** + * + * @author SnailClimb + * @date 2018年10月1日 + * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable + */ +public class CyclicBarrierExample2 { + // 请求的数量 + private static final int threadCount = 550; + // 需要同步的线程数量 + private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { + System.out.println("------当线程数达到之后,优先执行------"); + }); + + public static void main(String[] args) throws InterruptedException { + // 创建线程池 + ExecutorService threadPool = Executors.newFixedThreadPool(10); + + for (int i = 0; i < threadCount; i++) { + final int threadNum = i; + Thread.sleep(1000); + threadPool.execute(() -> { + try { + test(threadNum); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + } + threadPool.shutdown(); + } + + public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { + System.out.println("threadnum:" + threadnum + "is ready"); + cyclicBarrier.await(); + System.out.println("threadnum:" + threadnum + "is finish"); + } + +} +``` + +运行结果,如下: + +``` +threadnum:0is ready +threadnum:1is ready +threadnum:2is ready +threadnum:3is ready +threadnum:4is ready +------当线程数达到之后,优先执行------ +threadnum:4is finish +threadnum:0is finish +threadnum:2is finish +threadnum:1is finish +threadnum:3is finish +threadnum:5is ready +threadnum:6is ready +threadnum:7is ready +threadnum:8is ready +threadnum:9is ready +------当线程数达到之后,优先执行------ +threadnum:9is finish +threadnum:5is finish +threadnum:6is finish +threadnum:8is finish +threadnum:7is finish +...... +``` diff --git a/docs/java/concurrent/java-concurrent-questions-03.md b/docs/java/concurrent/java-concurrent-questions-03.md index dce0ac68..478b8bd3 100644 --- a/docs/java/concurrent/java-concurrent-questions-03.md +++ b/docs/java/concurrent/java-concurrent-questions-03.md @@ -538,12 +538,14 @@ protected boolean isHeldExclusively()//该线程是否正在独占资源。只 - https://www.cnblogs.com/waterystone/p/4920797.html - https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html -### AQS 组件总结 +### AQS 常用的组件有哪些? - **`Semaphore`(信号量)-允许多个线程同时访问:** `synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。 - **`CountDownLatch `(倒计时器):** `CountDownLatch` 是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。 - **`CyclicBarrier`(循环栅栏):** `CyclicBarrier` 和 `CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。`CyclicBarrier` 的字面意思是可循环使用(`Cyclic`)的屏障(`Barrier`)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。 + + ### 用过 CountDownLatch 么?什么场景下用的? `CountDownLatch` 的作用就是 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 `CountDownLatch` 。具体场景是下面这样的: diff --git a/docs/system-design/security/advantages&disadvantages-of-jwt.md b/docs/system-design/security/advantages&disadvantages-of-jwt.md index d5ea7750..19f5f277 100644 --- a/docs/system-design/security/advantages&disadvantages-of-jwt.md +++ b/docs/system-design/security/advantages&disadvantages-of-jwt.md @@ -157,7 +157,8 @@ JWT 认证的话,我们应该如何解决续签问题呢?查阅了很多资 - 需要客户端来配合; - 用户注销的时候需要同时保证两个 JWT 都无效; -- 重新请求获取 JWT 的过程中会有短暂 JWT 不可用的情况(可以通过在客户端设置定时器,当 accessJWT 快过期的时候,提前去通过 refreshJWT 获取新的 accessJWT)。 +- 重新请求获取 JWT 的过程中会有短暂 JWT 不可用的情况(可以通过在客户端设置定时器,当 accessJWT 快过期的时候,提前去通过 refreshJWT 获取新的 accessJWT); +- 存在安全问题,只要拿到了未过期的 refreshJWT 就一直可以获取到 accessJWT。 ## 总结