1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-08-14 05:21:42 +08:00

Compare commits

...

2 Commits

Author SHA1 Message Date
guide
9f66ae82de [docs update]完善Java 并发常见面试题总结(下)的内容 2022-12-15 22:42:21 +08:00
guide
696b6d6105 [docs update]精简完善 aqs 的内容 2022-12-15 22:02:20 +08:00
5 changed files with 772 additions and 810 deletions

View File

@ -303,8 +303,6 @@ _为什么会出现误判的情况呢? 我们还要从布隆过滤器的原理
#### 什么是缓存击穿?
缓存穿透中,请求的 key 既不存在于缓存中,也不存在于数据库中。
缓存击穿中,请求的 key 对应的是 **热点数据** ,该数据 **存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期)** 。这就可能会导致瞬时大量的请求直接打到了数据库上,对数据库造成了巨大的压力,可能直接就被这么多请求弄宕机了。
![缓存击穿](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/github/javaguide/database/redis/redis-cache-breakdown.png)
@ -317,6 +315,12 @@ _为什么会出现误判的情况呢? 我们还要从布隆过滤器的原理
- 针对热点数据提前预热,将其存入缓存中并设置合理的过期时间比如秒杀场景下的数据在秒杀结束之前不过期。
- 请求数据库写数据到缓存之前,先获取互斥锁,保证只有一个请求会落到数据库上,减少数据库的压力。
#### 缓存穿透和缓存击穿有什么区别?
缓存穿透中,请求的 key 既不存在于缓存中,也不存在于数据库中。
缓存击穿中,请求的 key 对应的是 **热点数据** ,该数据 **存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期)**
### 缓存雪崩
#### 什么是缓存雪崩?

View File

@ -5,17 +5,7 @@ tag:
- Java并发
---
开始之前,先来看几道常见的面试题!建议你带着这些问题来看这篇文章:
- 何为 AQSAQS 原理了解吗?
- `CountDownLatch``CyclicBarrier` 了解吗?两者的区别是什么?
- 用过 `Semaphore` 吗?应用场景了解吗?
- ......
相关阅读:[从 ReentrantLock 的实现看AQS的原理及应用](./reentrantlock.md)
## AQS 简单介绍
## AQS 介绍
AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面。
@ -28,180 +18,69 @@ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchron
}
```
AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue``FutureTask`(jdk1.7) 等等皆是基于 AQS 的。
AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
## AQS 原理
> 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
> 👍推荐阅读:[从 ReentrantLock 的实现看 AQS 的原理及应用](./reentrantlock.md)
下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来
### AQS 原理概览
### AQS 核心思想
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁**实现的,即将暂时获取不到锁的线程加入到队列中。
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁** 实现的,即将暂时获取不到锁的线程加入到队列中。
> CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。
CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。在 CLH 同步队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
看个 AQS(`AbstractQueuedSynchronizer`)原理图
CLH 队列结构如下图所示
![enter image description here](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/40cb932a64694262993907ebda6a0bfe~tplv-k3u1fbpfcp-zoom-1.image)
AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
AQS(`AbstractQueuedSynchronizer`)的核心原理图(图源[Java 并发之 AQS 详解](https://www.cnblogs.com/waterystone/p/4920797.html))如下:
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png)
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **线程等待队列** 来完成获取资源线程的排队工作。
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
```java
private volatile int state;//共享变量使用volatile修饰保证线程可见性
// 共享变量使用volatile修饰保证线程可见性
private volatile int state;
```
状态信息通过 `protected` 类型的`getState()``setState()``compareAndSetState()` 进行操作
另外,状态信息 `state` 可以通过 `protected` 类型的`getState()``setState()``compareAndSetState()` 进行操作。并且,这几个方法都是 `final` 修饰的,在子类中无法被重写。
```java
//返回同步状态的当前值
protected final int getState() {
return state;
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
state = newState;
}
//原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
```
### AQS 对资源的共享方式
`ReentrantLock` 为例,`state` 初始值为 0表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()``state=`0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
AQS 定义两种资源共享方式
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。
**1)Exclusive**(独占)
### AQS 资源共享方式
只有一个线程能执行,如 `ReentrantLock`。又可分为公平锁和非公平锁,`ReentrantLock` 同时支持两种锁,下面以 `ReentrantLock` 对这两种锁的定义做介绍:
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
- **公平锁** :按照线程在队列中的排队顺序,先到者先拿到锁
- **非公平锁** :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
> 说明:下面这部分关于 `ReentrantLock` 源代码内容节选自https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,这是一篇很不错文章,推荐阅读。
**下面来看 `ReentrantLock` 中相关的源代码:**
`ReentrantLock` 默认采用非公平锁,因为考虑获得更好的性能,通过 `boolean` 来决定是否用公平锁(传入 true 用公平锁)。
```java
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
```
`ReentrantLock` 中公平锁的 `lock` 方法
```java
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
```
非公平锁的 `lock` 方法:
```java
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平锁相比这里会直接先进行一次CAS成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里没有对阻塞队列进行判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```
总结:公平锁和非公平锁只有两处不同:
1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 `tryAcquire` 方法,在 `tryAcquire` 方法中如果发现锁这个时候被释放了state == 0非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
**2)Share**(共享)
多个线程可同时执行,如 `Semaphore/CountDownLatch``Semaphore``CountDownLatch``CyclicBarrier``ReadWriteLock` 我们都会在后面讲到。
`ReentrantReadWriteLock` 可以看成是组合式,因为 `ReentrantReadWriteLock` 也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等AQS 已经在上层已经帮我们实现好了。
### AQS 底层使用了模板方法模式
### 自定义同步器
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。
2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
@ -209,35 +88,115 @@ final boolean nonfairTryAcquire(int acquires) {
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
```java
protected boolean tryAcquire(int)//独占方式。尝试获取资源成功则返回true失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源成功则返回true失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源成功则返回true失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
//独占方式。尝试获取资源成功则返回true失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源成功则返回true失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源成功则返回true失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
```
**什么是钩子方法呢?** 钩子方法是一种被声明在抽象类中的方法,一般使用 `protected` 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。
篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:[用Java8 改造后的模板方法模式真的是 yyds!](https://mp.weixin.qq.com/s/zpScSCktFpnSWHWIQem2jg)。
篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:[ Java8 改造后的模板方法模式真的是 yyds!](https://mp.weixin.qq.com/s/zpScSCktFpnSWHWIQem2jg)。
除了上面提到的钩子方法之外AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。
`ReentrantLock` 为例state 初始化为 0表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()``state=`0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
## 常见同步工具类
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行state 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后` countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作
下面介绍几个基于 AQS 的常见同步工具类
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
### Semaphore(信号量)
推荐两篇 AQS 原理和相关源码分析的文章:
#### 介绍
- [Java并发之AQS详解](https://www.cnblogs.com/waterystone/p/4920797.html)
- [Java并发包基石-AQS详解](https://www.cnblogs.com/chengxiao/p/7141160.html)
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。
## Semaphore(信号量)
Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到贡献资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。
```java 
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
```
示例代码如下:
当初始的资源个数为 1 的时候,`Semaphore` 退化为排他锁。
`Semaphore` 有两种模式:。
- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO
- **非公平模式:** 抢占式的。
`Semaphore` 对应的两个构造方法如下:
```java
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。**
`Semaphore` 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。
#### 原理
`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state >= 0` 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果 `state<0` 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。
```java
/**
* 获取1个许可证
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证arg为获取许可证个数当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
```
调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state>=0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。
```java
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}
// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}
```
#### 实战
```java
/**
@ -253,7 +212,7 @@ public class SemaphoreExample1 {
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允许执行的线程数量。
// 初始许可证数量
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
@ -294,45 +253,31 @@ semaphore.release(5);// 释放5个许可
除了 `acquire()` 方法之外,另一个比较常用的与之对应的方法是 `tryAcquire()` 方法,该方法如果获取不到许可就立即返回 false。
`Semaphore` 有两种模式,公平模式和非公平模式。
[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645)
- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO
- **非公平模式:** 抢占式的
> `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state``permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1那么自旋的线程便会判断成功。
> 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量
`Semaphore` 对应的两个构造方法如下:
### CountDownLatch (倒计时器)
```java
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。**
[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645) `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 `permits`。当执行任务的线程数量超出 `permits`,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1那么自旋的线程便会判断成功。
如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。
## CountDownLatch (倒计时器)
#### 介绍
`CountDownLatch` 允许 `count` 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。
#### 原理
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
### CountDownLatch 的两种典型用法
#### 实战
**1、某一线程在开始运行前等待 n 个线程执行完毕。**
**CountDownLatch 的两种典型用法**
`CountDownLatch` 的计数器初始化为 n `new CountDownLatch(n)`),每当一个任务线程执行完毕,就将计数器减 1 `countdownlatch.countDown()`),当计数器的值变为 0 时,在 `CountDownLatch 上 await()` 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
1. 某一线程在开始运行前等待 n 个线程执行完毕 : 将 `CountDownLatch` 的计数器初始化为 n `new CountDownLatch(n)`),每当一个任务线程执行完毕,就将计数器减 1 `countdownlatch.countDown()`),当计数器的值变为 0 时,在 `CountDownLatch 上 await()` 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2. 实现多个线程开始执行任务的最大并行性 :注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 `CountDownLatch` 对象,将其计数器初始化为 1 `new CountDownLatch(1)`),多个线程在开始执行任务前首先 `coundownlatch.await()`,当主线程调用 `countDown()` 时,计数器变为 0多个线程同时被唤醒。
**2、实现多个线程开始执行任务的最大并行性。**
注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 `CountDownLatch` 对象,将其计数器初始化为 1 `new CountDownLatch(1)`),多个线程在开始执行任务前首先 `coundownlatch.await()`,当主线程调用 `countDown()` 时,计数器变为 0多个线程同时被唤醒。
### CountDownLatch 的使用示例
**CountDownLatch 代码示例**
```java
/**
@ -393,17 +338,9 @@ for (int i = 0; i < threadCount-1; i++) {
这样就导致 `count` 的值没办法等于 0然后就会导致一直等待。
### CountDownLatch 的不足
### CyclicBarrier(循环栅栏)
`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。
### CountDownLatch 相常见面试题
- `CountDownLatch` 怎么用?应用场景是什么?
- `CountDownLatch``CyclicBarrier` 的不同之处?
- `CountDownLatch` 类中主要的方法?
## CyclicBarrier(循环栅栏)
#### 介绍
`CyclicBarrier``CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。
@ -411,9 +348,20 @@ for (int i = 0; i < threadCount-1; i++) {
`CyclicBarrier` 的字面意思是可循环使用Cyclic的屏障Barrier。它要做的事情是让一组线程到达一个屏障也可以叫同步点时被阻塞直到最后一个线程到达屏障时屏障才会开门所有被屏障拦截的线程才会继续干活。
`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。
#### 原理
再来看一下它的构造函数:
`CyclicBarrier` 内部通过一个 `count` 变量作为计数器,`count` 的初始值为 `parties` 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
```java
//每次拦截的线程数
private final int parties;
//计数器
private int count;
```
下面我们结合源码来简单看看。
1、`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。
```java
public CyclicBarrier(int parties) {
@ -428,172 +376,9 @@ public CyclicBarrier(int parties, Runnable barrierAction) {
}
```
其中parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
其中,`parties` 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
### CyclicBarrier 的应用场景
`CyclicBarrier` 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。
### CyclicBarrier 的使用示例
示例 1
```java
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
```
运行结果,如下:
```
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
```
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, `await()` 方法之后的方法才被执行。
另外,`CyclicBarrier` 还提供一个更高级的构造函数 `CyclicBarrier(int parties, Runnable barrierAction)`,用于在线程到达屏障时,优先执行 `barrierAction`,方便处理更复杂的业务场景。示例代码如下:
```java
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
*/
public class CyclicBarrierExample3 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
```
运行结果,如下:
```
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
```
### CyclicBarrier 源码分析
当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。
2、当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。
```java
public int await() throws InterruptedException, BrokenBarrierException {
@ -605,7 +390,7 @@ public int await() throws InterruptedException, BrokenBarrierException {
}
```
`dowait(false, 0L)`
`dowait(false, 0L)`方法源码分析如下
```java
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
@ -685,24 +470,163 @@ public int await() throws InterruptedException, BrokenBarrierException {
lock.unlock();
}
}
```
总结:`CyclicBarrier` 内部通过一个 count 变量作为计数器count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
#### 实战
### CyclicBarrier 和 CountDownLatch 的区别
示例 1
下面这个是国外一个大佬的回答:
```java
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample1 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
`CountDownLatch` 是计数器,只能使用一次,而 `CyclicBarrier` 的计数器提供 `reset` 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看javadoc 是这么描述它们的:
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
> CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;)
> CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
对于 `CountDownLatch` 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 `CyclicBarrier`,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
`CountDownLatch` 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 `CyclicBarrier` 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
}
```
### ReentrantLock 和 ReentrantReadWriteLock
运行结果,如下:
`ReentrantLock``synchronized` 的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 `ReentrantReadWriteLock` 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。
```
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
```
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, `await()` 方法之后的方法才被执行。
另外,`CyclicBarrier` 还提供一个更高级的构造函数 `CyclicBarrier(int parties, Runnable barrierAction)`,用于在线程到达屏障时,优先执行 `barrierAction`,方便处理更复杂的业务场景。
示例 2
```java
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
*/
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
```
运行结果,如下:
```
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
```

View File

@ -26,71 +26,7 @@ head:
- **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
### 实现 Runnable 接口和 Callable 接口的区别
`Runnable`自 Java 1.0 以来一直存在,但`Callable`仅在 Java 1.5 中引入,目的就是为了来处理`Runnable`不支持的用例。**`Runnable` 接口** 不会返回结果或抛出检查异常,但是 **`Callable` 接口** 可以。所以,如果任务不需要返回结果或抛出异常推荐使用 **`Runnable` 接口** ,这样代码看起来会更加简洁。
工具类 `Executors` 可以实现将 `Runnable` 对象转换成 `Callable` 对象。(`Executors.callable(Runnable task)``Executors.callable(Runnable task, Object result)`)。
`Runnable.java`
```java
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}
```
`Callable.java`
```java
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
```
### 执行 execute()方法和 submit()方法的区别是什么呢?
1. **`execute()`方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;**
2. **`submit()`方法用于提交需要返回值的任务。线程池会返回一个 `Future` 类型的对象,通过这个 `Future` 对象可以判断任务是否执行成功**,并且可以通过 `Future``get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `get(long timeoutTimeUnit unit)`方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
我们以 **`AbstractExecutorService` 接口** 中的一个 `submit` 方法为例子来看看源代码:
```java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
```
上面方法调用的 `newTaskFor` 方法返回了一个 `FutureTask` 对象。
```java
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
```
我们再来看看`execute()`方法:
```java
public void execute(Runnable command) {
...
}
```
### 如何创建线程池
### 如何创建线程池?
《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
@ -115,41 +51,36 @@ public void execute(Runnable command) {
![Executor框架的工具类](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/Executor框架的工具类.png)
### ThreadPoolExecutor 类分析
`ThreadPoolExecutor` 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
### 核心线程数和最大线程数有什么区别?
```java
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
**下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。**
#### `ThreadPoolExecutor`构造函数重要参数分析
**`ThreadPoolExecutor` 3 个最重要的参数:**
- **`corePoolSize` :** 核心线程数定义了最小可以同时运行的线程数量。
@ -163,9 +94,7 @@ public ThreadPoolExecutor(int corePoolSize,
3. **`threadFactory`** :executor 创建新线程的时候会用到。
4. **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。
#### `ThreadPoolExecutor` 饱和策略
**`ThreadPoolExecutor` 饱和策略定义:**
### 线程池的饱和策略有哪些?
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,`ThreadPoolTaskExecutor` 定义一些策略:
@ -176,373 +105,353 @@ public ThreadPoolExecutor(int corePoolSize,
举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)
### 一个简单的线程池 Demo
### 线程池原理是什么?
为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。
**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。**
首先创建一个 `Runnable` 接口的实现类(当然也可以是 `Callable` 接口,我们上面也说了两者的区别。)
`MyRunnable.java`
我们可以使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
```java
import java.util.Date;
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 这是一个简单的Runnable类需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {
private String command;
public MyRunnable(String s) {
this.command = s;
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}
public void execute(Runnable command) {
// 如果任务为null则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
}
@Override
public String toString() {
return this.command;
}
}
```
编写测试程序,我们这里以阿里巴巴推荐的使用 `ThreadPoolExecutor` 构造函数自定义参数的方式来创建线程池。
`ThreadPoolExecutorDemo.java`
```java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
//创建 MyRunnable 对象MyRunnable 类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
```
可以看到我们上面的代码指定了:
1. `corePoolSize`: 核心线程数为 5。
2. `maximumPoolSize` :最大线程数 10
3. `keepAliveTime` : 等待时间为 1L。
4. `unit`: 等待时间的单位为 TimeUnit.SECONDS。
5. `workQueue`:任务队列为 `ArrayBlockingQueue`,并且容量为 100;
6. `handler`:饱和策略为 `CallerRunsPolicy`
**Output**
```
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
```
### 线程池原理分析
承接 4.6 节,我们通过代码输出结果可以看出:**线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。** 在 4.6 节中的 Demo 中我们使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
```java
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
```
通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。
![图解线程池实现原理](./images/java-thread-pool-summary/图解线程池实现原理.png)
![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png)
现在,让我们在回到 4.6 节我们写的 Demo 现在是不是很容易就可以搞懂它的原理了呢?
**`addWorker` 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。**
没搞懂的话,也没关系,可以看看我的分析:
```java
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合存放线程池中所有的活跃的工作线程只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
> 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了线程池就会去拿新的任务执行。
/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core参数为false的话表明队列也满了线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP也就是RUNNING或者SHUTDOWN状态下同时传入的任务实例firstTask为null则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```
更多关于线程池源码分析的内容推荐这篇文章:硬核干货:[4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理](https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/)
### 如何设定线程池的大小?
**线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。**
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
> 上下文切换:
>
> 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用为了让这些线程都能得到有效执行CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。**任务从保存到再加载的过程就是一次上下文切换**。
>
> 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
>
> Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
**类比于现实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。**
**如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的 CPU 根本没有得到充分利用。**
**但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。**
有一个简单并且适用面比较广的公式:
- **CPU 密集型任务(N+1)** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 NCPU 核心数)+1比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断或者其它原因导致的任务暂停而带来的影响。一旦任务暂停CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- **I/O 密集型任务(2N)** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
**如何判断是 CPU 密集任务还是 IO 密集任务?**
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
## Atomic 原子类
### 介绍一下 Atomic 原子类
`Atomic` 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所以,所谓原子类说简单点就是具有原子/原子操作特征的类。
并发包 `java.util.concurrent` 的原子类都存放在`java.util.concurrent.atomic`下,如下图所示。
![JUC原子类概览](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/JUC原子类概览.png)
### JUC 包中的原子类是哪 4 类?
**基本类型**
使用原子的方式更新基本类型
- `AtomicInteger`:整型原子类
- `AtomicLong`:长整型原子类
- `AtomicBoolean`:布尔型原子类
**数组类型**
使用原子的方式更新数组里的某个元素
- `AtomicIntegerArray`:整型数组原子类
- `AtomicLongArray`:长整型数组原子类
- `AtomicReferenceArray`:引用类型数组原子类
**引用类型**
- `AtomicReference`:引用类型原子类
- `AtomicStampedReference`:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- `AtomicMarkableReference` :原子更新带有标记位的引用类型
**对象的属性修改类型**
- `AtomicIntegerFieldUpdater`:原子更新整型字段的更新器
- `AtomicLongFieldUpdater`:原子更新长整型字段的更新器
- `AtomicReferenceFieldUpdater`:原子更新引用类型字段的更新器
### 讲讲 AtomicInteger 的使用
**`AtomicInteger` 类常用方法**
```java
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值则以原子方式将该值设置为输入值update
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
```
**`AtomicInteger` 类的使用示例**
使用 `AtomicInteger` 之后,不用对 `increment()` 方法加锁也可以保证线程安全。
```java
class AtomicIntegerTest {
private AtomicInteger count = new AtomicInteger();
//使用AtomicInteger之后不需要对该方法加锁也可以实现线程安全。
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
```
### 能不能给我简单介绍一下 AtomicInteger 类的原理
- [浅谈AtomicInteger实现原理](https://github.com/summerHearts/JavaArchitecture/blob/master/Concurrent/浅谈AtomicInteger实现原理.md)
- [Java实现CAS的原理](https://tobebetterjavaer.com/thread/cas.html)
Atomic 原子类部分的内容我单独写了一篇文章来总结: [Atomic 原子类总结](./atomic-classes.md) 。
## AQS
### AQS 介绍
### AQS 是什么?
AQS 的全称为(`AbstractQueuedSynchronizer`),这个类在` java.util.concurrent.locks `包下面。
AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面。
![AQS类](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/AQS类.png)
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/AQS.png)
AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出大量应用广泛的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue``FutureTask` 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。
### AQS 原理分析
AQS 原理这部分参考了部分博客,在 6.2 节末尾放了链接。
> 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参加,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。
#### AQS 原理概览
**AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。**
> CLH(Craig,Landin and Hagersten)队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。
看个 AQS(AbstractQueuedSynchronizer)原理图:
![AQS原理图](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/AQS原理图.png)
AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
AQS 就是一个抽象类,主要用来构建锁和同步器。
```java
private volatile int state;//共享变量使用volatile修饰保证线程可见性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
```
状态信息通过 protected 类型的 getStatesetStatecompareAndSetState 进行操作
AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
### AQS 的原理是什么?
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁** 实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列虚拟的双向队列即不存在队列实例仅存在结点之间的关联关系。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点Node来实现锁的分配。在 CLH 同步队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
CLH 队列结构如下图所示:
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/40cb932a64694262993907ebda6a0bfe~tplv-k3u1fbpfcp-zoom-1.image)
AQS(`AbstractQueuedSynchronizer`)的核心原理图(图源[Java 并发之 AQS 详解](https://www.cnblogs.com/waterystone/p/4920797.html))如下:
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png)
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **线程等待队列** 来完成获取资源线程的排队工作。
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
```java
// 共享变量使用volatile修饰保证线程可见性
private volatile int state;
```
另外,状态信息 `state` 可以通过 `protected` 类型的`getState()``setState()``compareAndSetState()` 进行操作。并且,这几个方法都是 `final` 修饰的,在子类中无法被重写。
```java
//返回同步状态的当前值
protected final int getState() {
return state;
return state;
}
//设置同步状态的值
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
state = newState;
}
//原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
```
#### AQS 对资源的共享方式
`ReentrantLock` 为例,`state` 初始值为 0表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()``state=`0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
**AQS 定义两种资源共享方式**
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。
- **Exclusive**(独占):只有一个线程能执行,如 `ReentrantLock`。又可分为公平锁和非公平锁:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
- **Share**(共享):多个线程可同时执行,如` CountDownLatch``Semaphore``CyclicBarrier``ReadWriteLock` 我们都会在后面讲到。
### Semaphore 有什么用?
`ReentrantReadWriteLock` 可以看成是组合式,因为 `ReentrantReadWriteLock` 也就是读写锁允许多个线程同时对某一资源进行读
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等AQS 已经在顶层实现好了
Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到贡献资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
#### AQS 底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
```java
protected boolean tryAcquire(int)//独占方式。尝试获取资源成功则返回true失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源成功则返回true失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源成功则返回true失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
```java
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
```
**什么是钩子方法呢?** 钩子方法是一种被声明在抽象类中的方法,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现
当初始的资源个数为 1 的时候,`Semaphore` 退化为排他锁。
除了上面提到的钩子方法之外AQS 类中的其他方法都是 `final` ,所以无法被其他类重写
`Semaphore` 有两种模式:。
`ReentrantLock` 为例state 初始化为 0表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()``state=`0即释放锁为止其它线程才有机会获取该锁。当然释放锁之前A 线程自己是可以重复获取此锁的state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO
- **非公平模式:** 抢占式的。
再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行state 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后` countDown()` 一次state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。
`Semaphore` 对应的两个构造方法如下:
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
```java
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
推荐两篇 AQS 原理和相关源码分析的文章:
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
- https://www.cnblogs.com/waterystone/p/4920797.html
- https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html
**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。**
### AQS 组件总结
`Semaphore` 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。
- **`Semaphore`(信号量)-允许多个线程同时访问:** `synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。
- **`CountDownLatch `(倒计时器):** `CountDownLatch` 是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
- **`CyclicBarrier`(循环栅栏)** `CyclicBarrier``CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。`CyclicBarrier` 的字面意思是可循环使用(`Cyclic`)的屏障(`Barrier`)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。
### Semaphore 的原理是什么?
`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state >= 0` 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果 `state<0` 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。
```java
/**
* 获取1个许可证
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证arg为获取许可证个数当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
```
调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state>=0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。
```java
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}
// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}
```
### CountDownLatch 有什么用?
`CountDownLatch` 允许 `count` 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。
### CountDownLatch 的原理是什么?
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
### 用过 CountDownLatch 么?什么场景下用的?
@ -623,12 +532,143 @@ List<CompletableFuture<String>> fileFutures = filePaths.stream()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);
```
### CyclicBarrier 有什么用?
`CyclicBarrier``CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。
> `CountDownLatch` 的实现是基于 AQS 的,而 `CycliBarrier` 是基于 `ReentrantLock`(`ReentrantLock` 也属于 AQS 同步器)和 `Condition` 的。
`CyclicBarrier` 的字面意思是可循环使用Cyclic的屏障Barrier。它要做的事情是让一组线程到达一个屏障也可以叫同步点时被阻塞直到最后一个线程到达屏障时屏障才会开门所有被屏障拦截的线程才会继续干活。
### CyclicBarrier 的原理是什么?
`CyclicBarrier` 内部通过一个 `count` 变量作为计数器,`count` 的初始值为 `parties` 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
```java
//每次拦截的线程数
private final int parties;
//计数器
private int count;
```
下面我们结合源码来简单看看。
1、`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。
```java
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
```
其中,`parties` 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
2、当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。
```java
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
```
`dowait(false, 0L)`方法源码分析如下:
```java
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
```
## 参考
- 《深入理解 Java 虚拟机》
- 《实战 Java 高并发程序设计》
- Java并发之AQS详解https://www.cnblogs.com/waterystone/p/4920797.html
- Java并发包基石-AQS详解https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html
- Java 并发之 AQS 详解https://www.cnblogs.com/waterystone/p/4920797.html
- Java 并发包基石-AQS 详解https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

View File

@ -5,11 +5,9 @@ tag:
- Java并发
---
## 1 使用线程池的好处
## 一 使用线程池的好处
> **池化技术想必大家已经屡见不鲜了线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。**
池化技术想必大家已经屡见不鲜了线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
**线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量。
@ -19,7 +17,7 @@ tag:
- **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
## Executor 框架
## 2 Executor 框架
### 2.1 简介
@ -76,7 +74,7 @@ public class ScheduledThreadPoolExecutor
3. **如果执行 `ExecutorService.submit``ExecutorService` 将返回一个实现`Future`接口的对象**(我们刚刚也提到过了执行 `execute()`方法和 `submit()`方法的区别,`submit()`会返回一个 `FutureTask 对象)。由于 FutureTask` 实现了 `Runnable`,我们也可以创建 `FutureTask`,然后直接交给 `ExecutorService` 执行。
4. **最后,主线程可以执行 `FutureTask.get()`方法来等待任务执行完成。主线程也可以执行 `FutureTask.cancelboolean mayInterruptIfRunning`来取消此任务的执行。**
## (重要)ThreadPoolExecutor 类简单介绍
## 3 (重要)ThreadPoolExecutor 类简单介绍
**线程池实现类 `ThreadPoolExecutor``Executor` 框架最核心的类。**
@ -173,7 +171,7 @@ public class ScheduledThreadPoolExecutor
![通过Executor 框架的工具类Executors来实现](./images/java-thread-pool-summary/Executors工具类.png)
## ThreadPoolExecutor 使用+原理分析
## 4 ThreadPoolExecutor 使用+原理分析
我们上面讲解了 `Executor`框架以及 `ThreadPoolExecutor` 类,下面让我们实战一下,来通过写一个 `ThreadPoolExecutor` 的小 Demo 来回顾上面的内容。
@ -657,7 +655,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
```
## 几种常见的线程池详解
## 5 几种常见的线程池详解
### 5.1 FixedThreadPool
@ -798,7 +796,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
`CachedThreadPool`允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
## ScheduledThreadPoolExecutor 详解
## 6 ScheduledThreadPoolExecutor 详解
**`ScheduledThreadPoolExecutor` 主要用来在给定的延迟后运行任务,或者定期执行任务。** 这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下它的思想即可。
@ -840,7 +838,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
3. 线程 1 修改 `ScheduledFutureTask` 的 time 变量为下次将要被执行的时间;
4. 线程 1 把这个修改 time 之后的 `ScheduledFutureTask` 放回 `DelayQueue` 中(`DelayQueue.add()`)。
## 线程池大小确定
## 7 线程池大小确定
**线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。**
@ -869,14 +867,9 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
## 参考
## 8 参考
- 《Java 并发编程的艺术》
- [Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example](https://www.journaldev.com/2340/java-scheduler-scheduledexecutorservice-scheduledthreadpoolexecutor-example "Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example")
- [java.util.concurrent.ScheduledThreadPoolExecutor Example](https://examples.javacodegeeks.com/core-java/util/concurrent/scheduledthreadpoolexecutor/java-util-concurrent-scheduledthreadpoolexecutor-example/ "java.util.concurrent.ScheduledThreadPoolExecutor Example")
- [ThreadPoolExecutor Java Thread Pool Example](https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice "ThreadPoolExecutor Java Thread Pool Example")
## 九 其他推荐阅读
- [Java 并发(三)线程池原理](https://www.cnblogs.com/warehouse/p/10720781.html "Java并发线程池原理")
- [如何优雅的使用和理解线程池](https://github.com/crossoverJie/JCSprout/blob/master/MD/ThreadPoolExecutor.md "如何优雅的使用和理解线程池")

View File

@ -157,7 +157,8 @@ JWT 认证的话,我们应该如何解决续签问题呢?查阅了很多资
- 需要客户端来配合;
- 用户注销的时候需要同时保证两个 JWT 都无效;
- 重新请求获取 JWT 的过程中会有短暂 JWT 不可用的情况(可以通过在客户端设置定时器,当 accessJWT 快过期的时候,提前去通过 refreshJWT 获取新的 accessJWT
- 重新请求获取 JWT 的过程中会有短暂 JWT 不可用的情况(可以通过在客户端设置定时器,当 accessJWT 快过期的时候,提前去通过 refreshJWT 获取新的 accessJWT;
- 存在安全问题,只要拿到了未过期的 refreshJWT 就一直可以获取到 accessJWT。
## 总结