From 9f66ae82de48e57d0e16dc96e2dcbeac62d18ec1 Mon Sep 17 00:00:00 2001 From: guide Date: Thu, 15 Dec 2022 22:42:21 +0800 Subject: [PATCH] =?UTF-8?q?[docs=20update]=E5=AE=8C=E5=96=84Java=20?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=B8=B8=E8=A7=81=E9=9D=A2=E8=AF=95=E9=A2=98?= =?UTF-8?q?=E6=80=BB=E7=BB=93=EF=BC=88=E4=B8=8B=EF=BC=89=E7=9A=84=E5=86=85?= =?UTF-8?q?=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/java/concurrent/aqs.md | 17 +- .../java-concurrent-questions-03.md | 854 +++++++++--------- .../concurrent/java-thread-pool-summary.md | 25 +- 3 files changed, 470 insertions(+), 426 deletions(-) diff --git a/docs/java/concurrent/aqs.md b/docs/java/concurrent/aqs.md index 593296f1..316b8613 100644 --- a/docs/java/concurrent/aqs.md +++ b/docs/java/concurrent/aqs.md @@ -114,7 +114,20 @@ protected boolean isHeldExclusively() #### 介绍 -`synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。 +`synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。 + +Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到贡献资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 + +```java  +// 初始共享资源数量 +final Semaphore semaphore = new Semaphore(5); +// 获取1个许可 +semaphore.acquire(); +// 释放1个许可 +semaphore.release(); +``` + +当初始的资源个数为 1 的时候,`Semaphore` 退化为排他锁。 `Semaphore` 有两种模式:。 @@ -199,7 +212,7 @@ public class SemaphoreExample1 { public static void main(String[] args) throws InterruptedException { // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); - // 一次只能允许执行的线程数量。 + // 初始许可证数量 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { diff --git a/docs/java/concurrent/java-concurrent-questions-03.md b/docs/java/concurrent/java-concurrent-questions-03.md index 478b8bd3..e5dc54b1 100644 --- a/docs/java/concurrent/java-concurrent-questions-03.md +++ b/docs/java/concurrent/java-concurrent-questions-03.md @@ -26,71 +26,7 @@ head: - **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。 - **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 -### 实现 Runnable 接口和 Callable 接口的区别 - -`Runnable`自 Java 1.0 以来一直存在,但`Callable`仅在 Java 1.5 中引入,目的就是为了来处理`Runnable`不支持的用例。**`Runnable` 接口** 不会返回结果或抛出检查异常,但是 **`Callable` 接口** 可以。所以,如果任务不需要返回结果或抛出异常推荐使用 **`Runnable` 接口** ,这样代码看起来会更加简洁。 - -工具类 `Executors` 可以实现将 `Runnable` 对象转换成 `Callable` 对象。(`Executors.callable(Runnable task)` 或 `Executors.callable(Runnable task, Object result)`)。 - -`Runnable.java` - -```java -@FunctionalInterface -public interface Runnable { - /** - * 被线程执行,没有返回值也无法抛出异常 - */ - public abstract void run(); -} -``` - -`Callable.java` - -```java -@FunctionalInterface -public interface Callable { - /** - * 计算结果,或在无法这样做时抛出异常。 - * @return 计算得出的结果 - * @throws 如果无法计算结果,则抛出异常 - */ - V call() throws Exception; -} -``` - -### 执行 execute()方法和 submit()方法的区别是什么呢? - -1. **`execute()`方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;** -2. **`submit()`方法用于提交需要返回值的任务。线程池会返回一个 `Future` 类型的对象,通过这个 `Future` 对象可以判断任务是否执行成功**,并且可以通过 `Future` 的 `get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `get(long timeout,TimeUnit unit)`方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。 - -我们以 **`AbstractExecutorService` 接口** 中的一个 `submit` 方法为例子来看看源代码: - -```java -public Future submit(Runnable task) { - if (task == null) throw new NullPointerException(); - RunnableFuture ftask = newTaskFor(task, null); - execute(ftask); - return ftask; -} -``` - -上面方法调用的 `newTaskFor` 方法返回了一个 `FutureTask` 对象。 - -```java -protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return new FutureTask(runnable, value); -} -``` - -我们再来看看`execute()`方法: - -```java -public void execute(Runnable command) { - ... -} -``` - -### 如何创建线程池 +### 如何创建线程池? 《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险 @@ -115,41 +51,36 @@ public void execute(Runnable command) { ![Executor框架的工具类](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/Executor框架的工具类.png) -### ThreadPoolExecutor 类分析 - -`ThreadPoolExecutor` 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。 +### 核心线程数和最大线程数有什么区别? ```java -/** - * 用给定的初始参数创建一个新的ThreadPoolExecutor。 - */ -public ThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory, - RejectedExecutionHandler handler) { - if (corePoolSize < 0 || - maximumPoolSize <= 0 || - maximumPoolSize < corePoolSize || - keepAliveTime < 0) + /** + * 用给定的初始参数创建一个新的ThreadPoolExecutor。 + */ + public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量 + int maximumPoolSize,//线程池的最大线程数 + long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间 + TimeUnit unit,//时间单位 + BlockingQueue workQueue,//任务队列,用来储存等待执行任务的队列 + ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可 + RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务 + ) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) throw new IllegalArgumentException(); - if (workQueue == null || threadFactory == null || handler == null) - throw new NullPointerException(); - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.workQueue = workQueue; - this.keepAliveTime = unit.toNanos(keepAliveTime); - this.threadFactory = threadFactory; - this.handler = handler; -} + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } ``` -**下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。** - -#### `ThreadPoolExecutor`构造函数重要参数分析 - **`ThreadPoolExecutor` 3 个最重要的参数:** - **`corePoolSize` :** 核心线程数定义了最小可以同时运行的线程数量。 @@ -163,9 +94,7 @@ public ThreadPoolExecutor(int corePoolSize, 3. **`threadFactory`** :executor 创建新线程的时候会用到。 4. **`handler`** :饱和策略。关于饱和策略下面单独介绍一下。 -#### `ThreadPoolExecutor` 饱和策略 - -**`ThreadPoolExecutor` 饱和策略定义:** +### 线程池的饱和策略有哪些? 如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,`ThreadPoolTaskExecutor` 定义一些策略: @@ -176,375 +105,353 @@ public ThreadPoolExecutor(int corePoolSize, 举个例子: Spring 通过 `ThreadPoolTaskExecutor` 或者我们直接通过 `ThreadPoolExecutor` 的构造函数创建线程池的时候,当我们不指定 `RejectedExecutionHandler` 饱和策略的话来配置线程池的时候默认使用的是 `ThreadPoolExecutor.AbortPolicy`。在默认情况下,`ThreadPoolExecutor` 将抛出 `RejectedExecutionException` 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 `ThreadPoolExecutor.CallerRunsPolicy`。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 `ThreadPoolExecutor` 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了) -### 一个简单的线程池 Demo +### 线程池原理是什么? -为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。 +**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。** -首先创建一个 `Runnable` 接口的实现类(当然也可以是 `Callable` 接口,我们上面也说了两者的区别。) - -`MyRunnable.java` +我们可以使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: ```java -import java.util.Date; + // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); -/** - * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 - * @author shuang.kou - */ -public class MyRunnable implements Runnable { - - private String command; - - public MyRunnable(String s) { - this.command = s; + private static int workerCountOf(int c) { + return c & CAPACITY; } + //任务队列 + private final BlockingQueue workQueue; - @Override - public void run() { - System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); - processCommand(); - System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); - } + public void execute(Runnable command) { + // 如果任务为null,则抛出异常。 + if (command == null) + throw new NullPointerException(); + // ctl 中保存的线程池当前的一些状态信息 + int c = ctl.get(); - private void processCommand() { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); + // 下面会涉及到 3 步 操作 + // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize + // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); } - } - - @Override - public String toString() { - return this.command; - } -} - -``` - -编写测试程序,我们这里以阿里巴巴推荐的使用 `ThreadPoolExecutor` 构造函数自定义参数的方式来创建线程池。 - -`ThreadPoolExecutorDemo.java` - -```java -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class ThreadPoolExecutorDemo { - - private static final int CORE_POOL_SIZE = 5; - private static final int MAX_POOL_SIZE = 10; - private static final int QUEUE_CAPACITY = 100; - private static final Long KEEP_ALIVE_TIME = 1L; - public static void main(String[] args) { - - //使用阿里巴巴推荐的创建线程池的方式 - //通过ThreadPoolExecutor构造函数自定义参数创建 - ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_POOL_SIZE, - MAX_POOL_SIZE, - KEEP_ALIVE_TIME, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(QUEUE_CAPACITY), - new ThreadPoolExecutor.CallerRunsPolicy()); - - for (int i = 0; i < 10; i++) { - //创建 MyRunnable 对象(MyRunnable 类实现了Runnable 接口) - Runnable worker = new MyRunnable("" + i); - //执行Runnable - executor.execute(worker); + // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里 + // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去 + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 + if (!isRunning(recheck) && remove(command)) + reject(command); + // 如果当前线程池为空就新创建一个线程并执行。 + else if (workerCountOf(recheck) == 0) + addWorker(null, false); } - //终止线程池 - executor.shutdown(); - while (!executor.isTerminated()) { - } - System.out.println("Finished all threads"); - } -} -``` - -可以看到我们上面的代码指定了: - -1. `corePoolSize`: 核心线程数为 5。 -2. `maximumPoolSize` :最大线程数 10 -3. `keepAliveTime` : 等待时间为 1L。 -4. `unit`: 等待时间的单位为 TimeUnit.SECONDS。 -5. `workQueue`:任务队列为 `ArrayBlockingQueue`,并且容量为 100; -6. `handler`:饱和策略为 `CallerRunsPolicy`。 - -**Output:** - -``` -pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 -pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020 -pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020 -pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020 -pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020 -pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020 -pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020 -pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020 -pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020 -pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020 -pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020 - -``` - -### 线程池原理分析 - -承接 4.6 节,我们通过代码输出结果可以看出:**线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会) - -现在,我们就分析上面的输出内容来简单分析一下线程池原理。 - -**为了搞懂线程池的原理,我们需要首先分析一下 `execute`方法。** 在 4.6 节中的 Demo 中我们使用 `executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: - -```java -// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) -private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); - -private static int workerCountOf(int c) { - return c & CAPACITY; -} - -private final BlockingQueue workQueue; - -public void execute(Runnable command) { - // 如果任务为null,则抛出异常。 - if (command == null) - throw new NullPointerException(); - // ctl 中保存的线程池当前的一些状态信息 - int c = ctl.get(); - - // 下面会涉及到 3 步 操作 - // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize - // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 - if (workerCountOf(c) < corePoolSize) { - if (addWorker(command, true)) - return; - c = ctl.get(); - } - // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里 - // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去 - if (isRunning(c) && workQueue.offer(command)) { - int recheck = ctl.get(); - // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 - if (!isRunning(recheck) && remove(command)) + //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 + //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 + else if (!addWorker(command, false)) reject(command); - // 如果当前线程池为空就新创建一个线程并执行。 - else if (workerCountOf(recheck) == 0) - addWorker(null, false); } - //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 - //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 - else if (!addWorker(command, false)) - reject(command); -} ``` 通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。 -![图解线程池实现原理](./images/java-thread-pool-summary/图解线程池实现原理.png) +![图解线程池实现原理](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/%E5%9B%BE%E8%A7%A3%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.png) -现在,让我们在回到 4.6 节我们写的 Demo, 现在是不是很容易就可以搞懂它的原理了呢? +**`addWorker` 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。** -没搞懂的话,也没关系,可以看看我的分析: +```java + // 全局锁,并发操作必备 + private final ReentrantLock mainLock = new ReentrantLock(); + // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合 + private int largestPoolSize; + // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合 + private final HashSet workers = new HashSet<>(); + //获取线程池状态 + private static int runStateOf(int c) { return c & ~CAPACITY; } + //判断线程池的状态是否为 Running + private static boolean isRunning(int c) { + return c < SHUTDOWN; + } -> 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。 + + /** + * 添加新的工作线程到线程池 + * @param firstTask 要执行 + * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小 + * @return 添加成功就返回true否则返回false + */ + private boolean addWorker(Runnable firstTask, boolean core) { + retry: + for (;;) { + //这两句用来获取线程池的状态 + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + + for (;;) { + //获取线程池中工作的线程的数量 + int wc = workerCountOf(c); + // core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + //原子操作将workcount的数量加1 + if (compareAndIncrementWorkerCount(c)) + break retry; + // 如果线程的状态改变了就再次执行上述操作 + c = ctl.get(); + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + // 标记工作线程是否启动成功 + boolean workerStarted = false; + // 标记工作线程是否创建成功 + boolean workerAdded = false; + Worker w = null; + try { + + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + // 加锁 + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + //获取线程池状态 + int rs = runStateOf(ctl.get()); + //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中 + //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker + // firstTask == null证明只新建线程而不执行任务 + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + workers.add(w); + //更新当前工作线程的最大容量 + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + // 工作线程是否启动成功 + workerAdded = true; + } + } finally { + // 释放锁 + mainLock.unlock(); + } + //// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例 + if (workerAdded) { + t.start(); + /// 标记线程启动成功 + workerStarted = true; + } + } + } finally { + // 线程启动失败,需要从工作线程中移除对应的Worker + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; + } +``` + +更多关于线程池源码分析的内容推荐这篇文章:硬核干货:[4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理](https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/) + +### 如何设定线程池的大小? + +**线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。** + +很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:**并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。** 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了**上下文切换**成本。不清楚什么是上下文切换的话,可以看我下面的介绍。 + +> 上下文切换: +> +> 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。**任务从保存到再加载的过程就是一次上下文切换**。 +> +> 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 +> +> Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。 + +**类比于现实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。** + +**如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。** + +**但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。** + +有一个简单并且适用面比较广的公式: + +- **CPU 密集型任务(N+1):** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。 +- **I/O 密集型任务(2N):** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 + +**如何判断是 CPU 密集任务还是 IO 密集任务?** + +CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。 ## Atomic 原子类 -### 介绍一下 Atomic 原子类 - -`Atomic` 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。 - -所以,所谓原子类说简单点就是具有原子/原子操作特征的类。 - -并发包 `java.util.concurrent` 的原子类都存放在`java.util.concurrent.atomic`下,如下图所示。 - -![JUC原子类概览](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/JUC原子类概览.png) - -### JUC 包中的原子类是哪 4 类? - -**基本类型** - -使用原子的方式更新基本类型 - -- `AtomicInteger`:整型原子类 -- `AtomicLong`:长整型原子类 -- `AtomicBoolean`:布尔型原子类 - -**数组类型** - -使用原子的方式更新数组里的某个元素 - -- `AtomicIntegerArray`:整型数组原子类 -- `AtomicLongArray`:长整型数组原子类 -- `AtomicReferenceArray`:引用类型数组原子类 - -**引用类型** - -- `AtomicReference`:引用类型原子类 -- `AtomicStampedReference`:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。 -- `AtomicMarkableReference` :原子更新带有标记位的引用类型 - -**对象的属性修改类型** - -- `AtomicIntegerFieldUpdater`:原子更新整型字段的更新器 -- `AtomicLongFieldUpdater`:原子更新长整型字段的更新器 -- `AtomicReferenceFieldUpdater`:原子更新引用类型字段的更新器 - -### 讲讲 AtomicInteger 的使用 - -**`AtomicInteger` 类常用方法** - -```java -public final int get() //获取当前的值 -public final int getAndSet(int newValue)//获取当前的值,并设置新的值 -public final int getAndIncrement()//获取当前的值,并自增 -public final int getAndDecrement() //获取当前的值,并自减 -public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 -boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) -public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。 -``` - -**`AtomicInteger` 类的使用示例** - -使用 `AtomicInteger` 之后,不用对 `increment()` 方法加锁也可以保证线程安全。 - -```java -class AtomicIntegerTest { - private AtomicInteger count = new AtomicInteger(); - //使用AtomicInteger之后,不需要对该方法加锁,也可以实现线程安全。 - public void increment() { - count.incrementAndGet(); - } - - public int getCount() { - return count.get(); - } -} - -``` - -### 能不能给我简单介绍一下 AtomicInteger 类的原理 - -- [浅谈AtomicInteger实现原理](https://github.com/summerHearts/JavaArchitecture/blob/master/Concurrent/浅谈AtomicInteger实现原理.md) -- [Java实现CAS的原理](https://tobebetterjavaer.com/thread/cas.html) +Atomic 原子类部分的内容我单独写了一篇文章来总结: [Atomic 原子类总结](./atomic-classes.md) 。 ## AQS -### AQS 介绍 +### AQS 是什么? -AQS 的全称为(`AbstractQueuedSynchronizer`),这个类在` java.util.concurrent.locks `包下面。 +AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面。 -![AQS类](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/AQS类.png) +![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/AQS.png) -AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出大量应用广泛的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`,`FutureTask` 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。 - -### AQS 原理分析 - -AQS 原理这部分参考了部分博客,在 6.2 节末尾放了链接。 - -> 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参加,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。 - -下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。 - -#### AQS 原理概览 - -**AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。** - -> CLH(Craig,Landin and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。 - -看个 AQS(AbstractQueuedSynchronizer)原理图: - -![AQS原理图](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-6/AQS原理图.png) - -AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。 +AQS 就是一个抽象类,主要用来构建锁和同步器。 ```java -private volatile int state;//共享变量,使用volatile修饰保证线程可见性 +public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { +} ``` -状态信息通过 protected 类型的 getState,setState,compareAndSetState 进行操作 +AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock`,`Semaphore`,其他的诸如 `ReentrantReadWriteLock`,`SynchronousQueue`等等皆是基于 AQS 的。 + +### AQS 的原理是什么? + +AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 **CLH 队列锁** 实现的,即将暂时获取不到锁的线程加入到队列中。 + +CLH(Craig,Landin,and Hagersten) 队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。在 CLH 同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。 + +CLH 队列结构如下图所示: + +![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/40cb932a64694262993907ebda6a0bfe~tplv-k3u1fbpfcp-zoom-1.image) + +AQS(`AbstractQueuedSynchronizer`)的核心原理图(图源[Java 并发之 AQS 详解](https://www.cnblogs.com/waterystone/p/4920797.html))如下: + +![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/Java%20%E7%A8%8B%E5%BA%8F%E5%91%98%E5%BF%85%E5%A4%87%EF%BC%9A%E5%B9%B6%E5%8F%91%E7%9F%A5%E8%AF%86%E7%B3%BB%E7%BB%9F%E6%80%BB%E7%BB%93/CLH.png) + +AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **线程等待队列** 来完成获取资源线程的排队工作。 + +`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。 + +```java +// 共享变量,使用volatile修饰保证线程可见性 +private volatile int state; +``` + +另外,状态信息 `state` 可以通过 `protected` 类型的`getState()`、`setState()`和`compareAndSetState()` 进行操作。并且,这几个方法都是 `final` 修饰的,在子类中无法被重写。 ```java //返回同步状态的当前值 protected final int getState() { - return state; + return state; } -//设置同步状态的值 + // 设置同步状态的值 protected final void setState(int newState) { - state = newState; + state = newState; } //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值) protected final boolean compareAndSetState(int expect, int update) { - return unsafe.compareAndSwapInt(this, stateOffset, expect, update); + return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ``` -#### AQS 对资源的共享方式 +以 `ReentrantLock` 为例,`state` 初始值为 0,表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()` 到 `state=`0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(`state` 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。 -**AQS 定义两种资源共享方式** +再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后`countDown()` 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。 -- **Exclusive**(独占):只有一个线程能执行,如 `ReentrantLock`。又可分为公平锁和非公平锁: - - 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁 - - 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的 -- **Share**(共享):多个线程可同时执行,如` CountDownLatch`、`Semaphore`、 `CyclicBarrier`、`ReadWriteLock` 我们都会在后面讲到。 +### Semaphore 有什么用? -`ReentrantReadWriteLock` 可以看成是组合式,因为 `ReentrantReadWriteLock` 也就是读写锁允许多个线程同时对某一资源进行读。 +`synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。 -不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。 +Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到贡献资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。 -#### AQS 底层使用了模板方法模式 - -同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用): - -1. 使用者继承 `AbstractQueuedSynchronizer` 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放) -2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。 - -这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。 - -**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:** - -```java -protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 -protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 -protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。 -protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 +```java +// 初始共享资源数量 +final Semaphore semaphore = new Semaphore(5); +// 获取1个许可 +semaphore.acquire(); +// 释放1个许可 +semaphore.release(); ``` -**什么是钩子方法呢?** 钩子方法是一种被声明在抽象类中的方法,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。 +当初始的资源个数为 1 的时候,`Semaphore` 退化为排他锁。 -除了上面提到的钩子方法之外,AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。 +`Semaphore` 有两种模式:。 -以 `ReentrantLock` 为例,state 初始化为 0,表示未锁定状态。A 线程 `lock()` 时,会调用 `tryAcquire()` 独占该锁并将 `state+1` 。此后,其他线程再 `tryAcquire()` 时就会失败,直到 A 线程 `unlock()` 到 `state=`0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。 +- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO; +- **非公平模式:** 抢占式的。 -再以 `CountDownLatch` 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后` countDown()` 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 `state=0` ),会 `unpark()` 主调用线程,然后主调用线程就会从 `await()` 函数返回,继续后余动作。 +`Semaphore` 对应的两个构造方法如下: -一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现`tryAcquire-tryRelease`、`tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`。 +```java +public Semaphore(int permits) { + sync = new NonfairSync(permits); +} -推荐两篇 AQS 原理和相关源码分析的文章: +public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); +} +``` -- https://www.cnblogs.com/waterystone/p/4920797.html -- https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html +**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。** -### AQS 常用的组件有哪些? +`Semaphore` 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。 -- **`Semaphore`(信号量)-允许多个线程同时访问:** `synchronized` 和 `ReentrantLock` 都是一次只允许一个线程访问某个资源,`Semaphore`(信号量)可以指定多个线程同时访问某个资源。 -- **`CountDownLatch `(倒计时器):** `CountDownLatch` 是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。 -- **`CyclicBarrier`(循环栅栏):** `CyclicBarrier` 和 `CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。`CyclicBarrier` 的字面意思是可循环使用(`Cyclic`)的屏障(`Barrier`)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。 +### Semaphore 的原理是什么? +`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。 +调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state >= 0` 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果 `state<0` 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。 + +```java +/** + * 获取1个许可证 + */ +public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} +/** + * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 + */ +public final void acquireSharedInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + // 尝试获取许可证,arg为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。 + if (tryAcquireShared(arg) < 0) + doAcquireSharedInterruptibly(arg); +} +``` + +调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state>=0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。 + +```java +// 释放一个许可证 +public void release() { + sync.releaseShared(1); +} + +// 释放共享锁,同时会唤醒同步队列中的一个线程。 +public final boolean releaseShared(int arg) { + //释放共享锁 + if (tryReleaseShared(arg)) { + //唤醒同步队列中的一个线程 + doReleaseShared(); + return true; + } + return false; +} +``` + +### CountDownLatch 有什么用? + +`CountDownLatch` 允许 `count` 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。 + +`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。 + +### CountDownLatch 的原理是什么? + +`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。 ### 用过 CountDownLatch 么?什么场景下用的? @@ -625,12 +532,143 @@ List> fileFutures = filePaths.stream() CompletableFuture allFutures = CompletableFuture.allOf( fileFutures.toArray(new CompletableFuture[fileFutures.size()]) ); +``` +### CyclicBarrier 有什么用? + +`CyclicBarrier` 和 `CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。 + +> `CountDownLatch` 的实现是基于 AQS 的,而 `CycliBarrier` 是基于 `ReentrantLock`(`ReentrantLock` 也属于 AQS 同步器)和 `Condition` 的。 + +`CyclicBarrier` 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。 + +### CyclicBarrier 的原理是什么? + +`CyclicBarrier` 内部通过一个 `count` 变量作为计数器,`count` 的初始值为 `parties` 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。 + +```java +//每次拦截的线程数 +private final int parties; +//计数器 +private int count; +``` + +下面我们结合源码来简单看看。 + +1、`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。 + +```java +public CyclicBarrier(int parties) { + this(parties, null); +} + +public CyclicBarrier(int parties, Runnable barrierAction) { + if (parties <= 0) throw new IllegalArgumentException(); + this.parties = parties; + this.count = parties; + this.barrierCommand = barrierAction; +} +``` + +其中,`parties` 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。 + +2、当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。 + +```java +public int await() throws InterruptedException, BrokenBarrierException { + try { + return dowait(false, 0L); + } catch (TimeoutException toe) { + throw new Error(toe); // cannot happen + } +} +``` + +`dowait(false, 0L)`方法源码分析如下: + +```java + // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。 + private int count; + /** + * Main barrier code, covering the various policies. + */ + private int dowait(boolean timed, long nanos) + throws InterruptedException, BrokenBarrierException, + TimeoutException { + final ReentrantLock lock = this.lock; + // 锁住 + lock.lock(); + try { + final Generation g = generation; + + if (g.broken) + throw new BrokenBarrierException(); + + // 如果线程中断了,抛出异常 + if (Thread.interrupted()) { + breakBarrier(); + throw new InterruptedException(); + } + // cout减1 + int index = --count; + // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件 + if (index == 0) { // tripped + boolean ranAction = false; + try { + final Runnable command = barrierCommand; + if (command != null) + command.run(); + ranAction = true; + // 将 count 重置为 parties 属性的初始化值 + // 唤醒之前等待的线程 + // 下一波执行开始 + nextGeneration(); + return 0; + } finally { + if (!ranAction) + breakBarrier(); + } + } + + // loop until tripped, broken, interrupted, or timed out + for (;;) { + try { + if (!timed) + trip.await(); + else if (nanos > 0L) + nanos = trip.awaitNanos(nanos); + } catch (InterruptedException ie) { + if (g == generation && ! g.broken) { + breakBarrier(); + throw ie; + } else { + // We're about to finish waiting even if we had not + // been interrupted, so this interrupt is deemed to + // "belong" to subsequent execution. + Thread.currentThread().interrupt(); + } + } + + if (g.broken) + throw new BrokenBarrierException(); + + if (g != generation) + return index; + + if (timed && nanos <= 0L) { + breakBarrier(); + throw new TimeoutException(); + } + } + } finally { + lock.unlock(); + } + } ``` ## 参考 - 《深入理解 Java 虚拟机》 - 《实战 Java 高并发程序设计》 -- Java并发之AQS详解:https://www.cnblogs.com/waterystone/p/4920797.html -- Java并发包基石-AQS详解:https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html +- Java 并发之 AQS 详解:https://www.cnblogs.com/waterystone/p/4920797.html +- Java 并发包基石-AQS 详解:https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html diff --git a/docs/java/concurrent/java-thread-pool-summary.md b/docs/java/concurrent/java-thread-pool-summary.md index 39f36968..2010b48d 100644 --- a/docs/java/concurrent/java-thread-pool-summary.md +++ b/docs/java/concurrent/java-thread-pool-summary.md @@ -5,11 +5,9 @@ tag: - Java并发 --- +## 1 使用线程池的好处 - -## 一 使用线程池的好处 - -> **池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。** +池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。 **线程池**提供了一种限制和管理资源(包括执行一个任务)的方式。 每个**线程池**还维护一些基本统计信息,例如已完成任务的数量。 @@ -19,7 +17,7 @@ tag: - **提高响应速度**。当任务到达时,任务可以不需要等到线程创建就能立即执行。 - **提高线程的可管理性**。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 -## 二 Executor 框架 +## 2 Executor 框架 ### 2.1 简介 @@ -76,7 +74,7 @@ public class ScheduledThreadPoolExecutor 3. **如果执行 `ExecutorService.submit(…)`,`ExecutorService` 将返回一个实现`Future`接口的对象**(我们刚刚也提到过了执行 `execute()`方法和 `submit()`方法的区别,`submit()`会返回一个 `FutureTask 对象)。由于 FutureTask` 实现了 `Runnable`,我们也可以创建 `FutureTask`,然后直接交给 `ExecutorService` 执行。 4. **最后,主线程可以执行 `FutureTask.get()`方法来等待任务执行完成。主线程也可以执行 `FutureTask.cancel(boolean mayInterruptIfRunning)`来取消此任务的执行。** -## 三 (重要)ThreadPoolExecutor 类简单介绍 +## 3 (重要)ThreadPoolExecutor 类简单介绍 **线程池实现类 `ThreadPoolExecutor` 是 `Executor` 框架最核心的类。** @@ -173,7 +171,7 @@ public class ScheduledThreadPoolExecutor ![通过Executor 框架的工具类Executors来实现](./images/java-thread-pool-summary/Executors工具类.png) -## 四 ThreadPoolExecutor 使用+原理分析 +## 4 ThreadPoolExecutor 使用+原理分析 我们上面讲解了 `Executor`框架以及 `ThreadPoolExecutor` 类,下面让我们实战一下,来通过写一个 `ThreadPoolExecutor` 的小 Demo 来回顾上面的内容。 @@ -657,7 +655,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4 Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5 ``` -## 五 几种常见的线程池详解 +## 5 几种常见的线程池详解 ### 5.1 FixedThreadPool @@ -798,7 +796,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5 `CachedThreadPool`允许创建的线程数量为 `Integer.MAX_VALUE` ,可能会创建大量线程,从而导致 OOM。 -## 六 ScheduledThreadPoolExecutor 详解 +## 6 ScheduledThreadPoolExecutor 详解 **`ScheduledThreadPoolExecutor` 主要用来在给定的延迟后运行任务,或者定期执行任务。** 这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下它的思想即可。 @@ -840,7 +838,7 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5 3. 线程 1 修改 `ScheduledFutureTask` 的 time 变量为下次将要被执行的时间; 4. 线程 1 把这个修改 time 之后的 `ScheduledFutureTask` 放回 `DelayQueue` 中(`DelayQueue.add()`)。 -## 七 线程池大小确定 +## 7 线程池大小确定 **线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。** @@ -869,14 +867,9 @@ Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5 CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。 -## 八 参考 +## 8 参考 - 《Java 并发编程的艺术》 - [Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example](https://www.journaldev.com/2340/java-scheduler-scheduledexecutorservice-scheduledthreadpoolexecutor-example "Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example") - [java.util.concurrent.ScheduledThreadPoolExecutor Example](https://examples.javacodegeeks.com/core-java/util/concurrent/scheduledthreadpoolexecutor/java-util-concurrent-scheduledthreadpoolexecutor-example/ "java.util.concurrent.ScheduledThreadPoolExecutor Example") - [ThreadPoolExecutor – Java Thread Pool Example](https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice "ThreadPoolExecutor – Java Thread Pool Example") - -## 九 其他推荐阅读 - -- [Java 并发(三)线程池原理](https://www.cnblogs.com/warehouse/p/10720781.html "Java并发(三)线程池原理") -- [如何优雅的使用和理解线程池](https://github.com/crossoverJie/JCSprout/blob/master/MD/ThreadPoolExecutor.md "如何优雅的使用和理解线程池")