diff --git a/docs/java/Multithread/JavaConcurrencyAdvancedCommonInterviewQuestions.md b/docs/java/Multithread/JavaConcurrencyAdvancedCommonInterviewQuestions.md index 2fba760b..c3143b25 100644 --- a/docs/java/Multithread/JavaConcurrencyAdvancedCommonInterviewQuestions.md +++ b/docs/java/Multithread/JavaConcurrencyAdvancedCommonInterviewQuestions.md @@ -509,7 +509,192 @@ public interface Callable { - **`ThreadPoolExecutor.DiscardPolicy`:** 不处理新任务,直接丢弃掉。 - **`ThreadPoolExecutor.DiscardOldestPolicy`:** 此策略将丢弃最早的未处理的任务请求。 -举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。) +举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了) + +### 4.6 一个简单的线程池Demo:`Runnable`+`ThreadPoolExecutor` + +为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。 + +首先创建一个 `Runnable` 接口的实现类(当然也可以是 `Callable` 接口,我们上面也说了两者的区别。) + +`MyRunnable.java` + +```java +import java.util.Date; + +/** + * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 + * @author shuang.kou + */ +public class MyRunnable implements Runnable { + + private String command; + + public MyRunnable(String s) { + this.command = s; + } + + @Override + public void run() { + System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); + processCommand(); + System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); + } + + private void processCommand() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public String toString() { + return this.command; + } +} + +``` + +编写测试程序,我们这里以阿里巴巴推荐的使用 `ThreadPoolExecutor` 构造函数自定义参数的方式来创建线程池。 + +`ThreadPoolExecutorDemo.java` + +```java +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ThreadPoolExecutorDemo { + + private static final int CORE_POOL_SIZE = 5; + private static final int MAX_POOL_SIZE = 10; + private static final int QUEUE_CAPACITY = 100; + private static final Long KEEP_ALIVE_TIME = 1L; + public static void main(String[] args) { + + //使用阿里巴巴推荐的创建线程池的方式 + //通过ThreadPoolExecutor构造函数自定义参数创建 + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + KEEP_ALIVE_TIME, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(QUEUE_CAPACITY), + new ThreadPoolExecutor.CallerRunsPolicy()); + + for (int i = 0; i < 10; i++) { + //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) + Runnable worker = new MyRunnable("" + i); + //执行Runnable + executor.execute(worker); + } + //终止线程池 + executor.shutdown(); + while (!executor.isTerminated()) { + } + System.out.println("Finished all threads"); + } +} + +``` + +可以看到我们上面的代码指定了: + +1. `corePoolSize`: 核心线程数为 5。 +2. `maximumPoolSize` :最大线程数 10 +3. `keepAliveTime` : 等待时间为 1L。 +4. `unit`: 等待时间的单位为 TimeUnit.SECONDS。 +5. `workQueue`:任务队列为 `ArrayBlockingQueue`,并且容量为 100; +6. `handler`:饱和策略为 `CallerRunsPolicy`。 + +**Output:** + +``` +pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019 +pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019 +pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019 +pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019 +pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019 +pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019 +pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019 +pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019 +pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019 +pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019 +pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019 + +``` + +### 4.7 线程池原理分析 + +承接 4.6 节,我们通过代码输出结果可以看出:**线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会) + +现在,我们就分析上面的输出内容来简单分析一下线程池原理。 + +**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。**在 4.6 节中的 Demo 中我们使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: + +```java + // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); + + private static int workerCountOf(int c) { + return c & CAPACITY; + } + + private final BlockingQueue workQueue; + + public void execute(Runnable command) { + // 如果任务为null,则抛出异常。 + if (command == null) + throw new NullPointerException(); + // ctl 中保存的线程池当前的一些状态信息 + int c = ctl.get(); + + // 下面会涉及到 3 步 操作 + // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize + // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 + // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去 + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 + if (!isRunning(recheck) && remove(command)) + reject(command); + // 如果当前线程池为空就新创建一个线程并执行。 + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 + //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 + else if (!addWorker(command, false)) + reject(command); + } +``` + +通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。 + +![图解线程池实现原理](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-7/图解线程池实现原理.png) + +现在,让我们在回到 4.6 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢? + +没搞懂的话,也没关系,可以看看我的分析: + +> 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务之行完成后,才会之行剩下的 5 个任务。 ## 5. Atomic 原子类