1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

[docs update]完善线程池相关的内容

This commit is contained in:
Guide 2023-02-07 11:03:15 +08:00
parent 3c9d023d94
commit c706bebb0d
8 changed files with 415 additions and 590 deletions

View File

@ -64,7 +64,7 @@ _这本书还是非常适合我们用来学习 Java 多线程的。这本书的
这本书的质量也是非常过硬!给作者们点个赞!这本书有统一的排版规则和语言风格、清晰的表达方式和逻辑。并且每篇文章初稿写完后,作者们就会互相审校,合并到主分支时所有成员会再次审校,最后再通篇修订了三遍。 这本书的质量也是非常过硬!给作者们点个赞!这本书有统一的排版规则和语言风格、清晰的表达方式和逻辑。并且每篇文章初稿写完后,作者们就会互相审校,合并到主分支时所有成员会再次审校,最后再通篇修订了三遍。
在线阅读https://redspider.gitbook.io/concurrent/ 。 在线阅读:[https://redspider.gitbook.io/concurrent/](https://redspider.gitbook.io/concurrent/ )
**[《Java 并发实现原理JDK 源码剖析》](https://book.douban.com/subject/35013531/)** **[《Java 并发实现原理JDK 源码剖析》](https://book.douban.com/subject/35013531/)**

View File

@ -204,9 +204,9 @@ SELECT id FROM table WHERE id=1;
### 最左前缀匹配原则 ### 最左前缀匹配原则
最左前缀匹配原则指的是,在使用联合索引时,**MySQL** 会根据联合索引中的字段顺序,从左到右依次到查询条件中去匹配,如果查询条件中存在与联合索引中最左侧字段相匹配的字段,则就会使用该字段过滤一批数据,直至联合索引中全部字段匹配完成,或者在执行过程中遇到范围查询,如 **`>`**、**`<`**、**`between`** 和 **`以%开头的like查询`** 等条件,才会停止匹配 最左前缀匹配原则指的是,在使用联合索引时,**MySQL** 会根据联合索引中的字段顺序,从左到右依次到查询条件中去匹配,如果查询条件中存在与联合索引中最左侧字段相匹配的字段,则就会使用该字段过滤一批数据,直至联合索引中全部字段匹配完成,或者在执行过程中遇到范围查询(如 **`>`**、**`<`**)才会停止匹配。对于 **`>=`**、**`<=`**、**`BETWEEN`**、**`like`** 前缀匹配的范围查询,并不会停止匹配。所以,我们在使用联合索引时,可以将区分度高的字段放在最左边,这也可以过滤更多数据
所以,我们在使用联合索引时,可以将区分度高的字段放在最左边,这也可以过滤更多数据 相关阅读:[联合索引的最左匹配原则全网都在说的一个错误结论](https://mp.weixin.qq.com/s/8qemhRg5MgXs1So5YCv0fQ)
## 索引下推 ## 索引下推

View File

@ -109,7 +109,7 @@ public class MultiThread {
- **同步** 发出一个调用之后,在没有得到结果之前, 该调用就不可以返回,一直等待。 - **同步** 发出一个调用之后,在没有得到结果之前, 该调用就不可以返回,一直等待。
- **异步** :调用在发出之后,不用等待返回结果,该调用直接返回。 - **异步** :调用在发出之后,不用等待返回结果,该调用直接返回。
## 为什么要使用多线程? ## 为什么要使用多线程?
先从总体上来说: 先从总体上来说:

View File

@ -14,9 +14,13 @@ head:
## 线程池 ## 线程池
### 什么是线程池?
顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。
### 为什么要用线程池? ### 为什么要用线程池?
> **池化技术想必大家已经屡见不鲜了线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。** 池化技术想必大家已经屡见不鲜了线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
**线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量。 **线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量。
@ -28,30 +32,70 @@ head:
### 如何创建线程池? ### 如何创建线程池?
《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险 **方式一:通过`ThreadPoolExecutor`构造函数实现(推荐)**
> Executors 返回线程池对象的弊端如下: ![通过构造方法实现](./images/java-thread-pool-summary/threadpoolexecutor构造函数.png)
>
> - **FixedThreadPool 和 SingleThreadExecutor** 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
> - **CachedThreadPool 和 ScheduledThreadPool** 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
**方式一:通过构造方法实现** **方式二:通过 `Executor` 框架的工具类 `Executors` 来实现**
我们可以创建三种类型的 `ThreadPoolExecutor`
![ThreadPoolExecutor构造方法](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/ThreadPoolExecutor构造方法.png) - **`FixedThreadPool`** 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
- **`SingleThreadExecutor`** 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
**方式二:通过 Executor 框架的工具类 Executors 来实现** - **`CachedThreadPool`** 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
我们可以创建三种类型的 ThreadPoolExecutor
- **FixedThreadPool** 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
- **SingleThreadExecutor** 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
- **CachedThreadPool** 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
对应 Executors 工具类中的方法如图所示: 对应 Executors 工具类中的方法如图所示:
![Executor框架的工具类](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/Executor框架的工具类.png) ![通过Executor 框架的工具类Executors来实现](./images/java-thread-pool-summary/Executors工具类.png)
### 核心线程数和最大线程数有什么区别? ### 为什么不推荐使用内置线程池?
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
**为什么呢?**
> 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 `Executors` 去创建,而是通过 `ThreadPoolExecutor` 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
`Executors` 返回线程池对象的弊端如下(后文会详细介绍到)
- **`FixedThreadPool``SingleThreadExecutor`** 使用的是无界的 `LinkedBlockingQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
- **`CachedThreadPool`** :使用的是同步队列 `SynchronousQueue`, 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
- **`ScheduledThreadPool``SingleThreadScheduledExecutor` ** : 使用的无界的延迟阻塞队列`DelayedWorkQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
```java
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
// 同步队列 SynchronousQueue没有容量最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
// DelayedWorkQueue延迟阻塞队列
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
```
### 线程池常见参数有哪些?如何解释?
```java ```java
/** /**
@ -83,16 +127,20 @@ head:
**`ThreadPoolExecutor` 3 个最重要的参数:** **`ThreadPoolExecutor` 3 个最重要的参数:**
- **`corePoolSize` :** 核心线程数定义了最小可以同时运行的线程数量。 - **`corePoolSize` :** 任务队列未达到队列容量时,最大可以同时运行的线程数量。
- **`maximumPoolSize` :** 队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。 - **`maximumPoolSize` :** 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- **`workQueue`:** 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。 - **`workQueue`:** 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
`ThreadPoolExecutor`其他常见参数: `ThreadPoolExecutor`其他常见参数 :
1. **`keepAliveTime`**:当线程池中的线程数量大于 `corePoolSize` 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 `keepAliveTime`才会被回收销毁; - **`keepAliveTime`**:线程池中的线程数量大于 `corePoolSize` 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 `keepAliveTime`才会被回收销毁;
2. **`unit`** : `keepAliveTime` 参数的时间单位。 - **`unit`** : `keepAliveTime` 参数的时间单位。
3. **`threadFactory`** :executor 创建新线程的时候会用到。 - **`threadFactory`** :executor 创建新线程的时候会用到。
4. **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。 - **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解图片来源《Java 性能调优实战》):
![线程池各个参数的关系](./images/java-thread-pool-summary/线程池各个参数之间的关系.png)
### 线程池的饱和策略有哪些? ### 线程池的饱和策略有哪些?
@ -105,170 +153,77 @@ head:
举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了) 举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)
### 线程池原理是什么 ### 线程池常用的阻塞队列有哪些
**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。** 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
我们可以使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: 不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。
```java - 容量为 `Integer.MAX_VALUE``LinkedBlockingQueue`(无界队列):`FixedThreadPool``SingleThreadExector` 。由于队列永远不会被放满,因此`FixedThreadPool`最多只能创建核心线程数的线程。
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) - `SynchronousQueue`(同步队列) `CachedThreadPool``SynchronousQueue` 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,`CachedThreadPool` 的最大线程数是 `Integer.MAX_VALUE` ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); - `DelayedWorkQueue`(延迟阻塞队列):`ScheduledThreadPool``SingleThreadScheduledExecutor``DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容原来容量的 1/2即永远不会阻塞最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。
private static int workerCountOf(int c) { ### 线程池处理任务的流程了解吗?
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任务为null则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
```
通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。
![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png) ![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png)
**`addWorker` 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。** 1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用`RejectedExecutionHandler.rejectedExecution()`方法。
### 如何给线程池命名?
初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。
默认情况下创建的线程名字类似 `pool-1-thread-n` 这样的,没有业务含义,不利于我们定位问题。
给线程池里的线程命名通常有下面两种方式:
**1、利用 guava 的 `ThreadFactoryBuilder` **
```java ```java
// 全局锁,并发操作必备 ThreadFactory threadFactory = new ThreadFactoryBuilder()
private final ReentrantLock mainLock = new ReentrantLock(); .setNameFormat(threadNamePrefix + "-%d")
// 跟踪线程池的最大大小只有在持有全局锁mainLock的前提下才能访问此集合 .setDaemon(true).build();
private int largestPoolSize; ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
// 工作线程集合存放线程池中所有的活跃的工作线程只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core参数为false的话表明队列也满了线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP也就是RUNNING或者SHUTDOWN状态下同时传入的任务实例firstTask为null则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
``` ```
更多关于线程池源码分析的内容推荐这篇文章:硬核干货:[4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理](https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/) **2、自己实现 `ThreadFactor`。**
```java
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/**
* 创建一个带名字的线程池生产工厂
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name; // TODO consider uniquifying this
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
```
### 如何设定线程池的大小? ### 如何设定线程池的大小?
**线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。**
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。 很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
> 上下文切换: > 上下文切换:
@ -279,21 +234,67 @@ head:
> >
> Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。 > Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
**类比于实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。** 类比于实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
**如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的 CPU 根本没有得到充分利用。** - 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的CPU 根本没有得到充分利用。
- 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
**但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。**
有一个简单并且适用面比较广的公式: 有一个简单并且适用面比较广的公式:
- **CPU 密集型任务(N+1)** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 NCPU 核心数)+1比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断或者其它原因导致的任务暂停而带来的影响。一旦任务暂停CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。 - **CPU 密集型任务(N+1)** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 NCPU 核心数)+1比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断或者其它原因导致的任务暂停而带来的影响。一旦任务暂停CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- **I/O 密集型任务(2N)** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 - **I/O 密集型任务(2N)** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
**如何判断是 CPU 密集任务还是 IO 密集任务?** **如何判断是 CPU 密集任务还是 IO 密集任务?**
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。 CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
> 🌈 拓展一下(参见:[issue#1737](https://github.com/Snailclimb/JavaGuide/issues/1737)
>
> 线程数更严谨的计算的方法应该是:`最佳线程数 = NCPU 核心数1+WT线程等待时间/ST线程计算时间`,其中 `WT线程等待时间=线程运行总时间 - ST线程计算时间`
>
> 线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。
>
> 我们可以通过 JDK 自带的工具 VisualVM 来查看 `WT/ST` 比例。
>
> CPU 密集型任务的 `WT/ST` 接近或者等于 0因此 线程数可以设置为 NCPU 核心数1+0= N和我们上面说的 NCPU 核心数)+1 差不多。
>
> IO 密集型任务下,几乎全是线程等待时间,从理论上来说,你就可以将线程数设置为 2N按道理来说WT/ST 的结果应该比较大,这里选择 2N 的原因应该是为了避免创建过多线程吧)。
公示也只是参考,具体还是要根据项目实际线上运行情况来动态调整。我在后面介绍的美团的线程池参数动态配置这种方案就非常不错,很实用!
### 如何动态修改线程池的参数?
美团技术团队在[《Java 线程池实现原理及其在美团业务中的实践》](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。
美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:
- **`corePoolSize` :** 核心线程数线程数定义了最小可以同时运行的线程数量。
- **`maximumPoolSize` :** 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- **`workQueue`:** 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
**为什么是这三个参数?**
我在[Java 线程池详解](https://javaguide.cn/java/concurrent/java-thread-pool-summary.html) 这篇文章中就说过这三个参数是 `ThreadPoolExecutor` 最重要的参数,它们基本决定了线程池对于任务的处理策略。
**如何支持参数动态配置?** 且看 `ThreadPoolExecutor` 提供的下面这些方法。
![](./images/thread-pool/b6fd95a7-4c9d-4fc6-ad26-890adb3f6c4c.png)
格外需要注意的是`corePoolSize` 程序运行期间的时候,我们调用 `setCorePoolSize`这个方法的话,线程池会首先判断当前工作线程数是否大于`corePoolSize`,如果大于的话就会回收工作线程。
另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 `ResizableCapacityLinkedBlockIngQueue` 的队列(主要就是把`LinkedBlockingQueue`的 capacity 字段的 final 关键字修饰给去掉了,让它变为可变的)。
最终实现的可动态修改线程池参数效果如下。👏👏👏
![动态配置线程池参数最终效果](./images/thread-pool/19a0255a-6ef3-4835-98d1-a839d1983332.png)
还没看够?推荐 why 神的[如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。](https://mp.weixin.qq.com/s/9HLuPcoWmTqAeFKa1kj-_A)这篇文章,深度剖析,很不错哦!
如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:
- **[Hippo-4](https://github.com/opengoofy/hippo4j)** :一款强大的动态线程池框架,解决了传统线程池使用存在的一些痛点比如线程池参数没办法动态修改、不支持运行时变量的传递、无法执行优雅关闭。除了支持动态修改线程池参数、线程池任务传递上下文,还支持通知报警、运行监控等开箱即用的功能。
- **[Dynamic TP](https://github.com/dromara/dynamic-tp)** 轻量级动态线程池内置监控告警功能集成三方中间件线程池管理基于主流配置中心已支持Nacos、ApolloZookeeper、Consul、Etcd可通过SPI自定义实现
## AQS ## AQS
### AQS 是什么? ### AQS 是什么?
@ -361,7 +362,7 @@ protected final boolean compareAndSetState(int expect, int update) {
Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
```java ```java
// 初始共享资源数量 // 初始共享资源数量
final Semaphore semaphore = new Semaphore(5); final Semaphore semaphore = new Semaphore(5);
// 获取1个许可 // 获取1个许可
@ -666,5 +667,7 @@ public int await() throws InterruptedException, BrokenBarrierException {
- 《深入理解 Java 虚拟机》 - 《深入理解 Java 虚拟机》
- 《实战 Java 高并发程序设计》 - 《实战 Java 高并发程序设计》
- 带你了解下 SynchronousQueue并发队列专题https://juejin.cn/post/7031196740128768037
- 阻塞队列 — DelayedWorkQueue 源码分析https://zhuanlan.zhihu.com/p/310621485
- Java 并发之 AQS 详解https://www.cnblogs.com/waterystone/p/4920797.html - Java 并发之 AQS 详解https://www.cnblogs.com/waterystone/p/4920797.html
- Java 并发包基石-AQS 详解https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html - Java 并发包基石-AQS 详解https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

View File

@ -5,141 +5,17 @@ tag:
- Java并发 - Java并发
--- ---
这篇文章篇幅虽短,但是绝对是干货。标题稍微有点夸张,嘿嘿,实际都是自己使用线程池的时候总结的一些个人感觉比较重要的点。
## 线程池知识回顾
开始这篇文章之前还是简单介绍线程池,之前写的[《新手也能看懂的线程池学习总结》](./java-thread-pool-summary.md)这篇文章介绍的很详细了。
### 为什么要使用线程池?
> **池化技术想必大家已经屡见不鲜了线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。**
**线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下**使用线程池的好处**
- **降低资源消耗**。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
### 线程池在实际项目的使用场景
**线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。**
假设我们要执行三个不相关的耗时任务Guide 画图给大家展示了使用线程池前后的区别。
注意:**下面三个任务可能做的是同一件事情,也可能是不一样的事情。**
> 使用多线程前应为:任务 1 --> 任务 2 --> 任务 3图中把任务 3 画错为 任务 2
![使用线程池前后对比](./images/thread-pool/1bc44c67-26ba-42ab-bcb8-4e29e6fd99b9.png)
### 如何使用线程池?
一般是通过 `ThreadPoolExecutor` 的构造函数来创建线程池,然后提交任务给线程池执行就可以了。
`ThreadPoolExecutor`构造函数如下:
```java
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
简单演示一下如何使用线程池,更详细的介绍,请看:[《新手也能看懂的线程池学习总结》](https://mp.weixin.qq.com/s?__biz=Mzg2OTA0Njk0OA==&mid=2247485808&idx=1&sn=1013253533d73450cef673aee13267ab&chksm=cea246bbf9d5cfad1c21316340a0ef1609a7457fea4113a1f8d69e8c91e7d9cd6285f5ee1490&token=510053261&lang=zh_CN&scene=21#wechat_redirect) 。
```java
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date" + Instant.now());
});
}
//终止线程池
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished all threads");
}
```
控制台输出:
```java
CurrentThread name:pool-1-thread-5date2020-06-06T11:45:31.639Z
CurrentThread name:pool-1-thread-3date2020-06-06T11:45:31.639Z
CurrentThread name:pool-1-thread-1date2020-06-06T11:45:31.636Z
CurrentThread name:pool-1-thread-4date2020-06-06T11:45:31.639Z
CurrentThread name:pool-1-thread-2date2020-06-06T11:45:31.639Z
CurrentThread name:pool-1-thread-2date2020-06-06T11:45:33.656Z
CurrentThread name:pool-1-thread-4date2020-06-06T11:45:33.656Z
CurrentThread name:pool-1-thread-1date2020-06-06T11:45:33.656Z
CurrentThread name:pool-1-thread-3date2020-06-06T11:45:33.656Z
CurrentThread name:pool-1-thread-5date2020-06-06T11:45:33.656Z
Finished all threads
```
## 线程池最佳实践
简单总结一下我了解的使用线程池的时候应该注意的东西,网上似乎还没有专门写这方面的文章。 简单总结一下我了解的使用线程池的时候应该注意的东西,网上似乎还没有专门写这方面的文章。
因为 Guide 还比较菜,有补充和完善的地方,可以在评论区告知或者在微信上与我交流。 ## 1、正确声明线程池
### 1. 使用 `ThreadPoolExecutor` 的构造函数声明线程池 **线程池必须手动通过 `ThreadPoolExecutor` 的构造函数来声明,避免使用`Executors` 类创建线程池,会有 OOM 风险。**
**1. 线程池必须手动通过 `ThreadPoolExecutor` 的构造函数来声明,避免使用`Executors` 类的 `newFixedThreadPool``newCachedThreadPool` ,因为可能会有 OOM 的风险。** `Executors` 返回线程池对象的弊端如下(后文会详细介绍到)
> Executors 返回线程池对象的弊端如下: - **`FixedThreadPool``SingleThreadExecutor`** 使用的是无界的 `LinkedBlockingQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
> - **`CachedThreadPool`** :使用的是同步队列 `SynchronousQueue`, 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
> - **`FixedThreadPool``SingleThreadExecutor`** 允许请求的队列长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。 - **`ScheduledThreadPool``SingleThreadScheduledExecutor` ** : 使用的无界的延迟阻塞队列`DelayedWorkQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
> - **CachedThreadPool 和 ScheduledThreadPool** 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
说白了就是:**使用有界队列,控制线程创建数量。** 说白了就是:**使用有界队列,控制线程创建数量。**
@ -148,7 +24,7 @@ Finished all threads
1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。 1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。 2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。
### 2.监测线程池运行状态 ## 2、监测线程池运行状态
你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。 你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。
@ -159,25 +35,25 @@ Finished all threads
下面是一个简单的 Demo。`printThreadPoolStatus()`会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。 下面是一个简单的 Demo。`printThreadPoolStatus()`会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。
```java ```java
/** /**
* 打印线程池的状态 * 打印线程池的状态
* *
* @param threadPool 线程池对象 * @param threadPool 线程池对象
*/ */
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) { public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false)); ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
scheduledExecutorService.scheduleAtFixedRate(() -> { scheduledExecutorService.scheduleAtFixedRate(() -> {
log.info("========================="); log.info("=========================");
log.info("ThreadPool Size: [{}]", threadPool.getPoolSize()); log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
log.info("Active Threads: {}", threadPool.getActiveCount()); log.info("Active Threads: {}", threadPool.getActiveCount());
log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount()); log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size()); log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("========================="); log.info("=========================");
}, 0, 1, TimeUnit.SECONDS); }, 0, 1, TimeUnit.SECONDS);
} }
``` ```
### 3.建议不同类别的业务用不同的线程池 ## 3、建议不同类别的业务用不同的线程池
很多人在实际项目中都会有类似这样的问题:**我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?** 很多人在实际项目中都会有类似这样的问题:**我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?**
@ -195,15 +71,15 @@ Finished all threads
解决方法也很简单,就是新增加一个用于执行子任务的线程池专门为其服务。 解决方法也很简单,就是新增加一个用于执行子任务的线程池专门为其服务。
### 4.别忘记给线程池命名 ## 4、别忘记给线程池命名
初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。 初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。
默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。 默认情况下创建的线程名字类似 `pool-1-thread-n` 这样的,没有业务含义,不利于我们定位问题。
给线程池里的线程命名通常有下面两种方式: 给线程池里的线程命名通常有下面两种方式:
**1.利用 guava 的 `ThreadFactoryBuilder` ** **1利用 guava 的 `ThreadFactoryBuilder` **
```java ```java
ThreadFactory threadFactory = new ThreadFactoryBuilder() ThreadFactory threadFactory = new ThreadFactoryBuilder()
@ -212,7 +88,7 @@ ThreadFactory threadFactory = new ThreadFactoryBuilder()
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory) ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
``` ```
**2.自己实现 `ThreadFactor`。** **2自己实现 `ThreadFactor`。**
```java ```java
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -245,13 +121,13 @@ public final class NamingThreadFactory implements ThreadFactory {
} }
``` ```
### 5.正确配置线程池参数 ## 5、正确配置线程池参数
说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)! 说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)!
我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考! 我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考!
#### 常规操作 ### 常规操作
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。 很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
@ -291,7 +167,7 @@ CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内
**公示也只是参考,具体还是要根据项目实际线上运行情况来动态调整。我在后面介绍的美团的线程池参数动态配置这种方案就非常不错,很实用!** **公示也只是参考,具体还是要根据项目实际线上运行情况来动态调整。我在后面介绍的美团的线程池参数动态配置这种方案就非常不错,很实用!**
#### 美团的骚操作 ### 美团的骚操作
美团技术团队在[《Java 线程池实现原理及其在美团业务中的实践》](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。 美团技术团队在[《Java 线程池实现原理及其在美团业务中的实践》](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。
@ -318,3 +194,8 @@ CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内
![动态配置线程池参数最终效果](./images/thread-pool/19a0255a-6ef3-4835-98d1-a839d1983332.png) ![动态配置线程池参数最终效果](./images/thread-pool/19a0255a-6ef3-4835-98d1-a839d1983332.png)
还没看够?推荐 why 神的[《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》](https://mp.weixin.qq.com/s/9HLuPcoWmTqAeFKa1kj-_A)这篇文章,深度剖析,很不错哦! 还没看够?推荐 why 神的[《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》](https://mp.weixin.qq.com/s/9HLuPcoWmTqAeFKa1kj-_A)这篇文章,深度剖析,很不错哦!
如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:
- **[Hippo-4](https://github.com/opengoofy/hippo4j)** :一款强大的动态线程池框架,解决了传统线程池使用存在的一些痛点比如线程池参数没办法动态修改、不支持运行时变量的传递、无法执行优雅关闭。除了支持动态修改线程池参数、线程池任务传递上下文,还支持通知报警、运行监控等开箱即用的功能。
- **[Dynamic TP](https://github.com/dromara/dynamic-tp)** 轻量级动态线程池内置监控告警功能集成三方中间件线程池管理基于主流配置中心已支持Nacos、ApolloZookeeper、Consul、Etcd可通过SPI自定义实现

View File

@ -5,50 +5,62 @@ tag:
- Java并发 - Java并发
--- ---
## 1 使用线程池的好处
池化技术想必大家已经屡见不鲜了线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。 池化技术想必大家已经屡见不鲜了线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
**线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量 这篇文章我会详细介绍一下线程池的基本概念以及核心原理
这里借用《Java 并发编程的艺术》提到的来说一下**使用线程池的好处** ## 线程池介绍
顾名思义,线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》书中的部分内容来总结一下使用线程池的好处:
- **降低资源消耗**。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 - **降低资源消耗**。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。 - **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 - **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
## 2 Executor 框架 **线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。**
### 2.1 简介 假设我们要执行三个不相关的耗时任务,画图给大家展示了使用线程池前后的区别。
注意:**下面三个任务可能做的是同一件事情,也可能是不一样的事情。**
> 使用多线程前应为:任务 1 --> 任务 2 --> 任务 3图中把任务 3 画错为 任务 2
![使用线程池前后对比](./images/thread-pool/1bc44c67-26ba-42ab-bcb8-4e29e6fd99b9.png)
## Executor 框架介绍
`Executor` 框架是 Java5 之后引进的,在 Java 5 之后,通过 `Executor` 来启动线程比使用 `Thread``start` 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。 `Executor` 框架是 Java5 之后引进的,在 Java 5 之后,通过 `Executor` 来启动线程比使用 `Thread``start` 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
> 补充this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。 > this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
`Executor` 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,`Executor` 框架让并发编程变得更加简单。 `Executor` 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,`Executor` 框架让并发编程变得更加简单。
### 2.2 Executor 框架结构(主要由三大部分组成) `Executor` 框架结构主要由三大部分组成:
#### 1) 任务(`Runnable` /`Callable`) **1、任务(`Runnable` /`Callable`)**
执行任务需要实现的 **`Runnable` 接口** 或 **`Callable`接口**。**`Runnable` 接口**或 **`Callable` 接口** 实现类都可以被 **`ThreadPoolExecutor`** 或 **`ScheduledThreadPoolExecutor`** 执行。 执行任务需要实现的 **`Runnable` 接口** 或 **`Callable`接口**。**`Runnable` 接口**或 **`Callable` 接口** 实现类都可以被 **`ThreadPoolExecutor`** 或 **`ScheduledThreadPoolExecutor`** 执行。
#### 2) 任务的执行(`Executor`) **2、任务的执行(`Executor`)**
如下图所示,包括任务执行机制的核心接口 **`Executor`** ,以及继承自 `Executor` 接口的 **`ExecutorService` 接口。`ThreadPoolExecutor`** 和 **`ScheduledThreadPoolExecutor`** 这两个关键类实现了 **ExecutorService 接口** 如下图所示,包括任务执行机制的核心接口 **`Executor`** ,以及继承自 `Executor` 接口的 **`ExecutorService` 接口。`ThreadPoolExecutor`** 和 **`ScheduledThreadPoolExecutor`** 这两个关键类实现了 **`ExecutorService`** 接口
**这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 `ThreadPoolExecutor` 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。** ![任务的执行相关接口](./images/java-thread-pool-summary/任务的执行相关接口.png)
> **注意:** 通过查看 `ScheduledThreadPoolExecutor` 源代码我们发现 `ScheduledThreadPoolExecutor` 实际上是继承了 `ThreadPoolExecutor` 并实现了 ScheduledExecutorService ,而 `ScheduledExecutorService` 又实现了 `ExecutorService`,正如我们下面给出的类关系图显示的一样 这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 `ThreadPoolExecutor` 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的
**`ThreadPoolExecutor` 类描述:** **注意:** 通过查看 `ScheduledThreadPoolExecutor` 源代码我们发现 `ScheduledThreadPoolExecutor` 实际上是继承了 `ThreadPoolExecutor` 并实现了 `ScheduledExecutorService` ,而 `ScheduledExecutorService` 又实现了 `ExecutorService`,正如我们上面给出的类关系图显示的一样。
`ThreadPoolExecutor` 类描述:
```java ```java
//AbstractExecutorService实现了ExecutorService接口 //AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService
``` ```
**`ScheduledThreadPoolExecutor` 类描述:** `ScheduledThreadPoolExecutor` 类描述:
```java ```java
//ScheduledExecutorService继承ExecutorService接口 //ScheduledExecutorService继承ExecutorService接口
@ -57,28 +69,26 @@ public class ScheduledThreadPoolExecutor
implements ScheduledExecutorService implements ScheduledExecutorService
``` ```
![任务的执行相关接口](./images/java-thread-pool-summary/任务的执行相关接口.png) **3、异步计算的结果(`Future`)**
#### 3) 异步计算的结果(`Future`)
**`Future`** 接口以及 `Future` 接口的实现类 **`FutureTask`** 类都可以代表异步计算的结果。 **`Future`** 接口以及 `Future` 接口的实现类 **`FutureTask`** 类都可以代表异步计算的结果。
当我们把 **`Runnable`接口** 或 **`Callable` 接口** 的实现类提交给 **`ThreadPoolExecutor`** 或 **`ScheduledThreadPoolExecutor`** 执行。(调用 `submit()` 方法时会返回一个 **`FutureTask`** 对象) 当我们把 **`Runnable`接口** 或 **`Callable` 接口** 的实现类提交给 **`ThreadPoolExecutor`** 或 **`ScheduledThreadPoolExecutor`** 执行。(调用 `submit()` 方法时会返回一个 **`FutureTask`** 对象)
### 2.3 Executor 框架的使用示意图 **`Executor` 框架的使用示意图**
![Executor 框架的使用示意图](./images/java-thread-pool-summary/Executor框架的使用示意图.png) ![Executor 框架的使用示意图](./images/java-thread-pool-summary/Executor框架的使用示意图.png)
1. **主线程首先要创建实现 `Runnable` 或者 `Callable` 接口的任务对象。** 1. 主线程首先要创建实现 `Runnable` 或者 `Callable` 接口的任务对象。
2. **把创建完成的实现 `Runnable`/`Callable`接口的 对象直接交给 `ExecutorService` 执行**: `ExecutorService.executeRunnable command`)或者也可以把 `Runnable` 对象或`Callable` 对象提交给 `ExecutorService` 执行(`ExecutorService.submitRunnable task``ExecutorService.submitCallable <T> task`)。 2. 把创建完成的实现 `Runnable`/`Callable`接口的 对象直接交给 `ExecutorService` 执行: `ExecutorService.executeRunnable command`)或者也可以把 `Runnable` 对象或`Callable` 对象提交给 `ExecutorService` 执行(`ExecutorService.submitRunnable task``ExecutorService.submitCallable <T> task`)。
3. **如果执行 `ExecutorService.submit``ExecutorService` 将返回一个实现`Future`接口的对象**(我们刚刚也提到过了执行 `execute()`方法和 `submit()`方法的区别,`submit()`会返回一个 `FutureTask 对象)。由于 FutureTask` 实现了 `Runnable`,我们也可以创建 `FutureTask`,然后直接交给 `ExecutorService` 执行。 3. 如果执行 `ExecutorService.submit``ExecutorService` 将返回一个实现`Future`接口的对象(我们刚刚也提到过了执行 `execute()`方法和 `submit()`方法的区别,`submit()`会返回一个 `FutureTask 对象)。由于 FutureTask` 实现了 `Runnable`,我们也可以创建 `FutureTask`,然后直接交给 `ExecutorService` 执行。
4. **最后,主线程可以执行 `FutureTask.get()`方法来等待任务执行完成。主线程也可以执行 `FutureTask.cancelboolean mayInterruptIfRunning`来取消此任务的执行。** 4. 最后,主线程可以执行 `FutureTask.get()`方法来等待任务执行完成。主线程也可以执行 `FutureTask.cancelboolean mayInterruptIfRunning`来取消此任务的执行。
## 3 (重要)ThreadPoolExecutor 类简单介绍 ## ThreadPoolExecutor 类介绍(重要)
**线程池实现类 `ThreadPoolExecutor``Executor` 框架最核心的类。** 线程池实现类 `ThreadPoolExecutor``Executor` 框架最核心的类。
### 3.1 ThreadPoolExecutor 类分析 ### 构造方法介绍
`ThreadPoolExecutor` 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。 `ThreadPoolExecutor` 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。
@ -114,16 +124,16 @@ public class ScheduledThreadPoolExecutor
**`ThreadPoolExecutor` 3 个最重要的参数:** **`ThreadPoolExecutor` 3 个最重要的参数:**
- **`corePoolSize` :** 核心线程数线程数定义了最小可以同时运行的线程数量。 - **`corePoolSize` :** 任务队列未达到队列容量时,最大可以同时运行的线程数量。
- **`maximumPoolSize` :** 队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。 - **`maximumPoolSize` :** 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- **`workQueue`:** 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。 - **`workQueue`:** 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
`ThreadPoolExecutor`其他常见参数 : `ThreadPoolExecutor`其他常见参数 :
1. **`keepAliveTime`**:当线程池中的线程数量大于 `corePoolSize` 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 `keepAliveTime`才会被回收销毁 - **`keepAliveTime`**:线程池中的线程数量大于 `corePoolSize` 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 `keepAliveTime`才会被回收销毁
2. **`unit`** : `keepAliveTime` 参数的时间单位。 - **`unit`** : `keepAliveTime` 参数的时间单位。
3. **`threadFactory`** :executor 创建新线程的时候会用到。 - **`threadFactory`** :executor 创建新线程的时候会用到。
4. **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。 - **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解图片来源《Java 性能调优实战》): 下面这张图可以加深你对线程池中各个参数的相互关系的理解图片来源《Java 性能调优实战》):
@ -140,42 +150,80 @@ public class ScheduledThreadPoolExecutor
举个例子: 举个例子:
> Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了 Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)
### 3.2 推荐使用 `ThreadPoolExecutor` 构造函数创建线程池 ### 线程池创建两种方式
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
**为什么呢?**
> 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 `Executors` 去创建,而是通过 `ThreadPoolExecutor` 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
> `Executors` 返回线程池对象的弊端如下(后文会详细介绍到)
>
> - **`FixedThreadPool``SingleThreadExecutor`** 允许请求的队列长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
> - **`CachedThreadPool``ScheduledThreadPool`** 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
**方式一:通过`ThreadPoolExecutor`构造函数实现(推荐)** **方式一:通过`ThreadPoolExecutor`构造函数实现(推荐)**
![通过构造方法实现](./images/java-thread-pool-summary/threadpoolexecutor构造函数.png) ![通过构造方法实现](./images/java-thread-pool-summary/threadpoolexecutor构造函数.png)
**方式二:通过 `Executor` 框架的工具类 `Executors` 来实现** **方式二:通过 `Executor` 框架的工具类 `Executors` 来实现**
我们可以创建三种类型的 `ThreadPoolExecutor` 我们可以创建三种类型的 `ThreadPoolExecutor`
- `FixedThreadPool` - **`FixedThreadPool`** 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
- `SingleThreadExecutor` - **`SingleThreadExecutor`** 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
- CachedThreadPool - **`CachedThreadPool`** 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
对应 Executors 工具类中的方法如图所示: 对应 Executors 工具类中的方法如图所示:
![通过Executor 框架的工具类Executors来实现](./images/java-thread-pool-summary/Executors工具类.png) ![通过Executor 框架的工具类Executors来实现](./images/java-thread-pool-summary/Executors工具类.png)
## 4 ThreadPoolExecutor 使用+原理分析 《阿里巴巴 Java 开发手册》强制线程池不允许使用 `Executors` 去创建,而是通过 `ThreadPoolExecutor` 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
`Executors` 返回线程池对象的弊端如下(后文会详细介绍到)
- **`FixedThreadPool``SingleThreadExecutor`** 使用的是无界的 `LinkedBlockingQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
- **`CachedThreadPool`** :使用的是同步队列 `SynchronousQueue`, 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
- **`ScheduledThreadPool``SingleThreadScheduledExecutor` ** : 使用的无界的延迟阻塞队列`DelayedWorkQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。
```java
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
// 同步队列 SynchronousQueue没有容量最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
// DelayedWorkQueue延迟阻塞队列
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
```
### 线程池常用的阻塞队列总结
新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。
- 容量为 `Integer.MAX_VALUE``LinkedBlockingQueue`(无界队列):`FixedThreadPool``SingleThreadExector` 。由于队列永远不会被放满,因此`FixedThreadPool`最多只能创建核心线程数的线程。
- `SynchronousQueue`(同步队列) `CachedThreadPool``SynchronousQueue` 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,`CachedThreadPool` 的最大线程数是 `Integer.MAX_VALUE` ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
- `DelayedWorkQueue`(延迟阻塞队列):`ScheduledThreadPool``SingleThreadScheduledExecutor``DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容原来容量的 1/2即永远不会阻塞最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。
## 线程池原理分析(重要)
我们上面讲解了 `Executor`框架以及 `ThreadPoolExecutor` 类,下面让我们实战一下,来通过写一个 `ThreadPoolExecutor` 的小 Demo 来回顾上面的内容。 我们上面讲解了 `Executor`框架以及 `ThreadPoolExecutor` 类,下面让我们实战一下,来通过写一个 `ThreadPoolExecutor` 的小 Demo 来回顾上面的内容。
### 4.1 示例代码:`Runnable`+`ThreadPoolExecutor` ### ThreadPoolExecutor 示例代码
首先创建一个 `Runnable` 接口的实现类(当然也可以是 `Callable` 接口,我们上面也说了两者的区别。) 首先创建一个 `Runnable` 接口的实现类(当然也可以是 `Callable` 接口,我们上面也说了两者的区别。)
@ -264,14 +312,14 @@ public class ThreadPoolExecutorDemo {
可以看到我们上面的代码指定了: 可以看到我们上面的代码指定了:
1. `corePoolSize`: 核心线程数为 5。 - `corePoolSize`: 核心线程数为 5。
2. `maximumPoolSize` :最大线程数 10 - `maximumPoolSize` :最大线程数 10
3. `keepAliveTime` : 等待时间为 1L。 - `keepAliveTime` : 等待时间为 1L。
4. `unit`: 等待时间的单位为 TimeUnit.SECONDS。 - `unit`: 等待时间的单位为 TimeUnit.SECONDS。
5. `workQueue`:任务队列为 `ArrayBlockingQueue`,并且容量为 100; - `workQueue`:任务队列为 `ArrayBlockingQueue`,并且容量为 100;
6. `handler`:饱和策略为 `CallerRunsPolicy` - `handler`:饱和策略为 `CallerRunsPolicy`
**Output** **输出结构**
``` ```
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
@ -297,13 +345,15 @@ pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
``` ```
### 4.2 线程池原理分析 ### 线程池原理分析
承接 4.1 节,我们通过代码输出结果可以看出:**线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会) 我们通过前面的代码输出结果可以看出:**线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。 现在,我们就分析上面的输出内容来简单分析一下线程池原理。
**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。** 在 4.1 节中的 Demo 中我们使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: 为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。 在示例代码中,我们使用 `executor.execute(worker)`来提交一个任务到线程池中去。
这个方法非常重要,下面我们来看看它的源码:
```java ```java
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
@ -330,14 +380,14 @@ pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
return; return;
c = ctl.get(); c = ctl.get();
} }
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) { if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command)) if (!isRunning(recheck) && remove(command))
reject(command); reject(command);
// 如果当前线程池为空就新创建一个线程并执行。 // 如果当前工作线程数量为0新创建一个线程并执行。
else if (workerCountOf(recheck) == 0) else if (workerCountOf(recheck) == 0)
addWorker(null, false); addWorker(null, false);
} }
@ -348,11 +398,18 @@ pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
} }
``` ```
通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。 这里简单分析一下整个流程(对整个逻辑进行了简化,方便理解):
1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用`RejectedExecutionHandler.rejectedExecution()`方法。
下图是我为了省事直接从网上找到,原地址不明。
![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png) ![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png)
**`addWorker` 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。** `addWorker` 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
```java ```java
// 全局锁,并发操作必备 // 全局锁,并发操作必备
@ -457,17 +514,17 @@ pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
} }
``` ```
更多关于线程池源码分析的内容推荐这篇文章:硬核干货:[4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理](https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/) 更多关于线程池源码分析的内容推荐这篇文章:硬核干货:[4W 字从源码上分析 JUC 线程池 ThreadPoolExecutor 的实现原理](https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/)
现在,让我们在回到 4.1 节我们写的 Demo 现在应该是不是很容易就可以搞懂它的原理了呢? 现在,让我们在回到示例代码 现在应该是不是很容易就可以搞懂它的原理了呢?
没搞懂的话,也没关系,可以看看我的分析: 没搞懂的话,也没关系,可以看看我的分析:
> 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。 > 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
### 4.3 几个常见的对比 ### 几个常见的对比
#### 4.3.1 `Runnable` vs `Callable` #### `Runnable` vs `Callable`
`Runnable`自 Java 1.0 以来一直存在,但`Callable`仅在 Java 1.5 中引入,目的就是为了来处理`Runnable`不支持的用例。**`Runnable` 接口**不会返回结果或抛出检查异常,但是 **`Callable` 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 **`Runnable` 接口**,这样代码看起来会更加简洁。 `Runnable`自 Java 1.0 以来一直存在,但`Callable`仅在 Java 1.5 中引入,目的就是为了来处理`Runnable`不支持的用例。**`Runnable` 接口**不会返回结果或抛出检查异常,但是 **`Callable` 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 **`Runnable` 接口**,这样代码看起来会更加简洁。
@ -500,14 +557,14 @@ public interface Callable<V> {
``` ```
#### 4.3.2 `execute()` vs `submit()` #### `execute()` vs `submit()`
- `execute()`方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; - `execute()`方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
- `submit()`方法用于提交需要返回值的任务。线程池会返回一个 `Future` 类型的对象,通过这个 `Future` 对象可以判断任务是否执行成功,并且可以通过 `Future``get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `getlong timeoutTimeUnit unit`方法的话,如果在 `timeout` 时间内任务还没有执行完,就会抛出 `java.util.concurrent.TimeoutException` - `submit()`方法用于提交需要返回值的任务。线程池会返回一个 `Future` 类型的对象,通过这个 `Future` 对象可以判断任务是否执行成功,并且可以通过 `Future``get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `getlong timeoutTimeUnit unit`方法的话,如果在 `timeout` 时间内任务还没有执行完,就会抛出 `java.util.concurrent.TimeoutException`
这里只是为了演示使用,推荐使用 `ThreadPoolExecutor` 构造方法来创建线程池。 这里只是为了演示使用,推荐使用 `ThreadPoolExecutor` 构造方法来创建线程池。
示例1使用 `get() `方法获取返回值。 示例 1使用 `get()`方法获取返回值。
```java ```java
ExecutorService executorService = Executors.newFixedThreadPool(3); ExecutorService executorService = Executors.newFixedThreadPool(3);
@ -532,7 +589,7 @@ executorService.shutdown();
abc abc
``` ```
示例2使用 `getlong timeoutTimeUnit unit `方法获取返回值。 示例 2使用 `getlong timeoutTimeUnit unit`方法获取返回值。
```java ```java
ExecutorService executorService = Executors.newFixedThreadPool(3); ExecutorService executorService = Executors.newFixedThreadPool(3);
@ -558,110 +615,23 @@ Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205) at java.util.concurrent.FutureTask.get(FutureTask.java:205)
``` ```
#### 4.3.3 `shutdown()`VS`shutdownNow()` #### `shutdown()`VS`shutdownNow()`
- **`shutdown`** :关闭线程池,线程池的状态变为 `SHUTDOWN`。线程池不再接受新任务了,但是队列里的任务得执行完毕。 - **`shutdown`** :关闭线程池,线程池的状态变为 `SHUTDOWN`。线程池不再接受新任务了,但是队列里的任务得执行完毕。
- **`shutdownNow`** :关闭线程池,线程的状态变为 `STOP`。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。 - **`shutdownNow`** :关闭线程池,线程的状态变为 `STOP`。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
#### 4.3.2 `isTerminated()` VS `isShutdown()` #### `isTerminated()` VS `isShutdown()`
- **`isShutDown`** 当调用 `shutdown()` 方法后返回为 true。 - **`isShutDown`** 当调用 `shutdown()` 方法后返回为 true。
- **`isTerminated`** 当调用 `shutdown()` 方法后,并且所有提交的任务完成后返回为 true - **`isTerminated`** 当调用 `shutdown()` 方法后,并且所有提交的任务完成后返回为 true
### 4.4 加餐:`Callable`+`ThreadPoolExecutor`示例代码 ## 几种常见的内置线程池
`MyCallable.java` ### FixedThreadPool
```java #### 介绍
import java.util.concurrent.Callable; `FixedThreadPool` 被称为可重用固定线程数的线程池。通过 `Executors` 类中的相关源代码来看一下相关实现:
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回执行当前 Callable 的线程名字
return Thread.currentThread().getName();
}
}
```
`CallableDemo.java`
```java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CallableDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<String>> futureList = new ArrayList<>();
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
//提交任务到线程池
Future<String> future = executor.submit(callable);
//将返回值 future 添加到 list我们可以通过 future 获得 执行 Callable 得到的返回值
futureList.add(future);
}
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//关闭线程池
executor.shutdown();
}
}
```
Output:
```
Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
```
## 5 几种常见的线程池详解
### 5.1 FixedThreadPool
#### 5.1.1 介绍
`FixedThreadPool` 被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:
```java ```java
/** /**
@ -685,9 +655,11 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
} }
``` ```
**从上面源代码可以看出新创建的 `FixedThreadPool``corePoolSize``maximumPoolSize` 都被设置为 nThreads这个 nThreads 参数是我们使用的时候自己传递的。** 从上面源代码可以看出新创建的 `FixedThreadPool``corePoolSize``maximumPoolSize` 都被设置为 `nThreads`,这个 `nThreads` 参数是我们使用的时候自己传递的。
#### 5.1.2 执行任务过程介绍 即使 `maximumPoolSize` 的值比 `corePoolSize` 大,也至多只会创建 `corePoolSize` 个线程。这是因为`FixedThreadPool` 使用的是容量为 `Integer.MAX_VALUE``LinkedBlockingQueue`(无界队列),队列永远不会被放满。
#### 执行任务过程介绍
`FixedThreadPool``execute()` 方法运行示意图该图片来源《Java 并发编程的艺术》): `FixedThreadPool``execute()` 方法运行示意图该图片来源《Java 并发编程的艺术》):
@ -695,22 +667,22 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
**上图说明:** **上图说明:**
1. 如果当前运行的线程数小于 corePoolSize 如果再来新任务的话,就创建新的线程来执行任务; 1. 如果当前运行的线程数小于 `corePoolSize` 如果再来新任务的话,就创建新的线程来执行任务;
2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 `LinkedBlockingQueue` 2. 当前运行的线程数等于 `corePoolSize` 后, 如果再来新任务的话,会将任务加入 `LinkedBlockingQueue`
3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 `LinkedBlockingQueue` 中获取任务来执行; 3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 `LinkedBlockingQueue` 中获取任务来执行;
#### 5.1.3 为什么不推荐使用`FixedThreadPool` #### 为什么不推荐使用`FixedThreadPool`
**`FixedThreadPool` 使用无界队列 `LinkedBlockingQueue`(队列的容量为 Integer.MAX_VALUE作为线程池的工作队列会对线程池带来如下影响 ** `FixedThreadPool` 使用无界队列 `LinkedBlockingQueue`(队列的容量为 Integer.MAX_VALUE作为线程池的工作队列会对线程池带来如下影响
1. 当线程池中的线程数达到 `corePoolSize` 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize 1. 当线程池中的线程数达到 `corePoolSize` 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 `corePoolSize`
2. 由于使用无界队列时 `maximumPoolSize` 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 `FixedThreadPool`的源码可以看出创建的 `FixedThreadPool``corePoolSize``maximumPoolSize` 被设置为同一个值。 2. 由于使用无界队列时 `maximumPoolSize` 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 `FixedThreadPool`的源码可以看出创建的 `FixedThreadPool``corePoolSize``maximumPoolSize` 被设置为同一个值。
3. 由于 1 和 2使用无界队列时 `keepAliveTime` 将是一个无效参数; 3. 由于 1 和 2使用无界队列时 `keepAliveTime` 将是一个无效参数;
4. 运行中的 `FixedThreadPool`(未执行 `shutdown()``shutdownNow()`)不会拒绝任务,在任务比较多的时候会导致 OOM内存溢出 4. 运行中的 `FixedThreadPool`(未执行 `shutdown()``shutdownNow()`)不会拒绝任务,在任务比较多的时候会导致 OOM内存溢出
### 5.2 SingleThreadExecutor 详解 ### SingleThreadExecutor
#### 5.2.1 介绍 #### 介绍
`SingleThreadExecutor` 是只有一个线程的线程池。下面看看**SingleThreadExecutor 的实现:** `SingleThreadExecutor` 是只有一个线程的线程池。下面看看**SingleThreadExecutor 的实现:**
@ -736,9 +708,9 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
} }
``` ```
从上面源代码可以看出新创建的 `SingleThreadExecutor``corePoolSize``maximumPoolSize` 都被设置为 1.其他参数和 `FixedThreadPool` 相同。 从上面源代码可以看出新创建的 `SingleThreadExecutor``corePoolSize``maximumPoolSize` 都被设置为 1其他参数和 `FixedThreadPool` 相同。
#### 5.2.2 执行任务过程介绍 #### 执行任务过程介绍
`SingleThreadExecutor` 的运行示意图该图片来源《Java 并发编程的艺术》): `SingleThreadExecutor` 的运行示意图该图片来源《Java 并发编程的艺术》):
![SingleThreadExecutor的运行示意图](./images/java-thread-pool-summary/SingleThreadExecutor.png) ![SingleThreadExecutor的运行示意图](./images/java-thread-pool-summary/SingleThreadExecutor.png)
@ -749,13 +721,13 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
2. 当前线程池中有一个运行的线程后,将任务加入 `LinkedBlockingQueue` 2. 当前线程池中有一个运行的线程后,将任务加入 `LinkedBlockingQueue`
3. 线程执行完当前的任务后,会在循环中反复从`LinkedBlockingQueue` 中获取任务来执行; 3. 线程执行完当前的任务后,会在循环中反复从`LinkedBlockingQueue` 中获取任务来执行;
#### 5.2.3 为什么不推荐使用`SingleThreadExecutor` #### 为什么不推荐使用`SingleThreadExecutor`
`SingleThreadExecutor` 使用无界队列 `LinkedBlockingQueue` 作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE`SingleThreadExecutor` 使用无界队列作为线程池的工作队列会对线程池带来的影响与 `FixedThreadPool` 相同。说简单点就是可能会导致 OOM `SingleThreadExecutor` `FixedThreadPool` 一样,使用的都是容量为 `Integer.MAX_VALUE``LinkedBlockingQueue`(无界队列)作为线程池的工作队列`SingleThreadExecutor` 使用无界队列作为线程池的工作队列会对线程池带来的影响与 `FixedThreadPool` 相同。说简单点,就是可能会导致 OOM。
### 5.3 CachedThreadPool 详解 ### CachedThreadPool
#### 5.3.1 介绍 #### 介绍
`CachedThreadPool` 是一个会根据需要创建新线程的线程池。下面通过源码来看看 `CachedThreadPool` 的实现: `CachedThreadPool` 是一个会根据需要创建新线程的线程池。下面通过源码来看看 `CachedThreadPool` 的实现:
@ -782,7 +754,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
`CachedThreadPool``corePoolSize` 被设置为空0`maximumPoolSize`被设置为 `Integer.MAX.VALUE`,即它是无界的,这也就意味着如果主线程提交任务的速度高于 `maximumPool` 中线程处理任务的速度时,`CachedThreadPool` 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。 `CachedThreadPool``corePoolSize` 被设置为空0`maximumPoolSize`被设置为 `Integer.MAX.VALUE`,即它是无界的,这也就意味着如果主线程提交任务的速度高于 `maximumPool` 中线程处理任务的速度时,`CachedThreadPool` 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
#### 5.3.2 执行任务过程介绍 #### 执行任务过程介绍
`CachedThreadPool``execute()` 方法的执行示意图该图片来源《Java 并发编程的艺术》): `CachedThreadPool``execute()` 方法的执行示意图该图片来源《Java 并发编程的艺术》):
![CachedThreadPool的execute()方法的执行示意图](./images/java-thread-pool-summary/CachedThreadPool-execute.png) ![CachedThreadPool的execute()方法的执行示意图](./images/java-thread-pool-summary/CachedThreadPool-execute.png)
@ -792,82 +764,51 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
1. 首先执行 `SynchronousQueue.offer(Runnable task)` 提交任务到任务队列。如果当前 `maximumPool` 中有闲线程正在执行 `SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)`,那么主线程执行 offer 操作与空闲线程执行的 `poll` 操作配对成功,主线程把任务交给空闲线程执行,`execute()`方法执行完成,否则执行下面的步骤 2 1. 首先执行 `SynchronousQueue.offer(Runnable task)` 提交任务到任务队列。如果当前 `maximumPool` 中有闲线程正在执行 `SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)`,那么主线程执行 offer 操作与空闲线程执行的 `poll` 操作配对成功,主线程把任务交给空闲线程执行,`execute()`方法执行完成,否则执行下面的步骤 2
2. 当初始 `maximumPool` 为空,或者 `maximumPool` 中没有空闲线程时,将没有线程执行 `SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)`。这种情况下,步骤 1 将失败,此时 `CachedThreadPool` 会创建新线程执行任务execute 方法执行完成; 2. 当初始 `maximumPool` 为空,或者 `maximumPool` 中没有空闲线程时,将没有线程执行 `SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)`。这种情况下,步骤 1 将失败,此时 `CachedThreadPool` 会创建新线程执行任务execute 方法执行完成;
#### 5.3.3 为什么不推荐使用`CachedThreadPool` #### 为什么不推荐使用`CachedThreadPool`
`CachedThreadPool`允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。 `CachedThreadPool` 使用的是同步队列 `SynchronousQueue`, 允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。
## 6 ScheduledThreadPoolExecutor 详解 ### ScheduledThreadPool
**`ScheduledThreadPoolExecutor` 主要用来在给定的延迟后运行任务,或者定期执行任务。** 这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下它的思想即可。 #### 介绍
### 6.1 简介 `ScheduledThreadPool` 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下即可。
`ScheduledThreadPoolExecutor` 使用的任务队列 `DelayQueue` 封装了一个 `PriorityQueue``PriorityQueue` 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(`ScheduledFutureTask``time` 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(`ScheduledFutureTask``squenceNumber` 变量小的先执行)。 ```java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
```
**`ScheduledThreadPoolExecutor``Timer` 的比较:** `ScheduledThreadPool` 是通过 `ScheduledThreadPoolExecutor` 创建的,使用的`DelayedWorkQueue`(延迟阻塞队列)作为线程池的任务队列。
`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容原来容量的 1/2即永远不会阻塞最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。
`ScheduledThreadPoolExecutor` 继承了 `ThreadPoolExecutor`,所以创建 `ScheduledThreadExecutor` 本质也是创建一个 `ThreadPoolExecutor` 线程池,只是传入的参数不相同。
```java
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
```
#### ScheduledThreadPoolExecutor 和 Timer 对比
- `Timer` 对系统时钟的变化敏感,`ScheduledThreadPoolExecutor`不是; - `Timer` 对系统时钟的变化敏感,`ScheduledThreadPoolExecutor`不是;
- `Timer` 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 `ScheduledThreadPoolExecutor` 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory你可以完全控制创建的线程; - `Timer` 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 `ScheduledThreadPoolExecutor` 可以配置任意数量的线程。 此外,如果你想(通过提供 `ThreadFactory`),你可以完全控制创建的线程;
- 在`TimerTask` 中抛出的运行时异常会杀死一个线程,从而导致 `Timer` 死机:-( ...即计划任务将不再运行。`ScheduledThreadExecutor` 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 `afterExecute` 方法`ThreadPoolExecutor`)。抛出异常的任务将被取消,但其他任务将继续运行。 - 在`TimerTask` 中抛出的运行时异常会杀死一个线程,从而导致 `Timer` 死机即计划任务将不再运行。`ScheduledThreadExecutor` 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 `afterExecute` 方法`ThreadPoolExecutor`)。抛出异常的任务将被取消,但其他任务将继续运行。
**综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。** 关于定时任务的详细介绍,可以看这篇文章:[Java 定时任务详解](https://javaguide.cn/system-design/schedule-task.html) 。
> 关于定时任务的详细介绍,小伙伴们可以在 JavaGuide 的项目首页搜索“定时任务”找到对应的原创内容。 ## 线程池最佳实践
### 6.2 运行机制 [Java 线程池最佳实践](https://javaguide.cn/java/concurrent/java-thread-pool-best-practices.html)这篇文章总结了一些使用线程池的时候应该注意的东西,实际项目使用线程池之前可以看看。
![ScheduledThreadPoolExecutor运行机制](./images/java-thread-pool-summary/ScheduledThreadPoolExecutor机制.png) ## 参考
**`ScheduledThreadPoolExecutor` 的执行主要分为两大部分:**
1. 当调用 `ScheduledThreadPoolExecutor`**`scheduleAtFixedRate()`** 方法或者 **`scheduleWithFixedDelay()`** 方法时,会向 `ScheduledThreadPoolExecutor`**`DelayQueue`** 添加一个实现了 **`RunnableScheduledFuture`** 接口的 **`ScheduledFutureTask`** 。
2. 线程池中的线程从 `DelayQueue` 中获取 `ScheduledFutureTask`,然后执行任务。
**`ScheduledThreadPoolExecutor` 为了实现周期性的执行任务,对 `ThreadPoolExecutor`做了如下修改:**
- 使用 **`DelayQueue`** 作为任务队列;
- 获取任务的方不同
- 执行周期任务后,增加了额外的处理
### 6.3 ScheduledThreadPoolExecutor 执行周期任务的步骤
![ScheduledThreadPoolExecutor执行周期任务的步骤](./images/java-thread-pool-summary/ScheduledThreadPoolExecutor执行周期任务步骤.png)
1. 线程 1 从 `DelayQueue` 中获取已到期的 `ScheduledFutureTaskDelayQueue.take()`。到期任务是指 `ScheduledFutureTask`的 time 大于等于当前系统的时间;
2. 线程 1 执行这个 `ScheduledFutureTask`
3. 线程 1 修改 `ScheduledFutureTask` 的 time 变量为下次将要被执行的时间;
4. 线程 1 把这个修改 time 之后的 `ScheduledFutureTask` 放回 `DelayQueue` 中(`DelayQueue.add()`)。
## 7 线程池大小确定
**线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。**
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
> 上下文切换:
>
> 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用为了让这些线程都能得到有效执行CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。**任务从保存到再加载的过程就是一次上下文切换**。
>
> 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
>
> Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
**类比于现实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。**
**如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的 CPU 根本没有得到充分利用。**
**但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。**
有一个简单并且适用面比较广的公式:
- **CPU 密集型任务(N+1)** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 NCPU 核心数)+1比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断或者其它原因导致的任务暂停而带来的影响。一旦任务暂停CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- **I/O 密集型任务(2N)** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
**如何判断是 CPU 密集任务还是 IO 密集任务?**
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
## 8 参考
- 《Java 并发编程的艺术》 - 《Java 并发编程的艺术》
- [Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example](https://www.journaldev.com/2340/java-scheduler-scheduledexecutorservice-scheduledthreadpoolexecutor-example "Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example") - [Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example](https://www.journaldev.com/2340/java-scheduler-scheduledexecutorservice-scheduledthreadpoolexecutor-example "Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example")

View File

@ -134,7 +134,7 @@ icon: "xitongsheji"
### 多线程 ### 多线程
- **[Hippo-4J](https://github.com/opengoofy/hippo4j)** :一款强大的动态线程池框架,解决了传统线程池使用存在的一些痛点比如线程池参数没办法动态修改、不支持运行时变量的传递、无法执行优雅关闭。除了支持动态修改线程池参数、线程池任务传递上下文,还支持通知报警、运行监控等开箱即用的功能。 - **[Hippo-4J](https://github.com/opengoofy/hippo4j)** :一款强大的动态线程池框架,解决了传统线程池使用存在的一些痛点比如线程池参数没办法动态修改、不支持运行时变量的传递、无法执行优雅关闭。除了支持动态修改线程池参数、线程池任务传递上下文,还支持通知报警、运行监控等开箱即用的功能。
- **[Dynamic Tp](https://github.com/dromara/dynamic-tp)** 一款基于 SpringBoot 的轻量级动态线程池,参考[美团线程池实践](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html),内置监控告警功能,提供多种报警维度 - **[Dynamic Tp](https://github.com/dromara/dynamic-tp)** 轻量级动态线程池内置监控告警功能集成三方中间件线程池管理基于主流配置中心已支持Nacos、ApolloZookeeper、Consul、Etcd可通过SPI自定义实现
- **[asyncTool](https://gitee.com/jd-platform-opensource/asyncTool)** : 京东的一位大佬开源的多线程工具库,里面大量使用到了 `CompletableFuture` ,可以解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。 - **[asyncTool](https://gitee.com/jd-platform-opensource/asyncTool)** : 京东的一位大佬开源的多线程工具库,里面大量使用到了 `CompletableFuture` ,可以解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。
### 缓存 ### 缓存

View File

@ -82,7 +82,7 @@ timer.schedule(task, delay);
![](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/20210607154324712.png) ![](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/20210607154324712.png)
`ScheduledThreadPoolExecutor` 本身就是一个线程池,支持任务并发执行。并且,其内部使用 `DelayQueue` 作为任务队列。 `ScheduledThreadPoolExecutor` 本身就是一个线程池,支持任务并发执行。并且,其内部使用 `DelayedWorkQueue` 作为任务队列。
```java ```java
// 示例代码: // 示例代码: