From ce6daf112e212d6dd5993441614e865eef1167ff Mon Sep 17 00:00:00 2001 From: guide Date: Wed, 1 Sep 2021 18:41:05 +0800 Subject: [PATCH] =?UTF-8?q?Create=20CompletableFuture=E4=BB=8E=E5=85=A5?= =?UTF-8?q?=E9=97=A8=E5=88=B0=E5=AE=9E=E6=88=98.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CompletableFuture从入门到实战.md | 492 ++++++++++++++++++ 1 file changed, 492 insertions(+) create mode 100644 docs/java/multi-thread/CompletableFuture从入门到实战.md diff --git a/docs/java/multi-thread/CompletableFuture从入门到实战.md b/docs/java/multi-thread/CompletableFuture从入门到实战.md new file mode 100644 index 00000000..485588bd --- /dev/null +++ b/docs/java/multi-thread/CompletableFuture从入门到实战.md @@ -0,0 +1,492 @@ +自己在项目中使用 `CompletableFuture` 比较多,看到很多开源框架中也大量使用到了 `CompletableFuture` 。 + +因此,专门写一篇文章来介绍这个 Java 8 才被引入的一个非常有用的用于异步编程的类。 + +## 简单介绍 + +`CompletableFuture` 同时实现了 `Future` 和 `CompletionStage` 接口。其除了提供了更为好用和强大的 `Future` 特性之外,还提供了函数式编程的能力。 + +```java +public class CompletableFuture implements Future, CompletionStage { +} +``` + +## 常见操作 + +### 创建 CompletableFuture + +常见的创建 `CompletableFuture` 对象的方法如下: + +1. 通过 new 关键字。 +2. 基于 `CompletableFuture` 自带的静态工厂方法:`runAsync()` 、`supplyAsync()` 。 + +#### new 关键字 + +通过 new 关键字创建 `CompletableFuture` 对象这种使用方式可以看作是将 `CompletableFuture` 当做 `Future` 来使用。 + +我在我的开源项目 [guide-rpc-framework](https://github.com/Snailclimb/guide-rpc-framework) 中就是这种方式创建的 `CompletableFuture` 对象。 + +下面咱们来看一个简单的案例。 + +我们通过创建了一个结果值类型为 `RpcResponse` 的 `CompletableFuture`,你可以把 `resultFuture` 看作是异步运算结果的载体。 + +```java +CompletableFuture> resultFuture = new CompletableFuture<>(); +``` + +假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 `complete()` 方法为其传入结果,这表示 `resultFuture` 已经被完成了。 + +```java +// complete() 方法只能调用一次,后续调用将被忽略。 +resultFuture.complete(rpcResponse); +``` + +你可以通过 `isDone()` 方法来检查是否已经完成。 + +```java +public boolean isDone() { + return result != null; +} +``` + +获取异步计算的结果也非常简单,直接调用 `get()` 方法即可! + +```java +rpcResponse = completableFuture.get(); +``` + +注意 : `get()` 方法并不会阻塞,因为我们已经知道异步运算的结果了。 + +如果你已经知道计算的结果的话,可以使用静态方法 `completedFuture()` 来创建 `CompletableFuture` 。 + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!"); +assertEquals("hello!", future.get()); +``` + +`completedFuture()` 方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。 + +```java +public static CompletableFuture completedFuture(U value) { + return new CompletableFuture((value == null) ? NIL : value); +} +``` + +#### 静态工厂方法 + +这两个方法可以帮助我们封装计算逻辑。 + +```java +static CompletableFuture supplyAsync(Supplier supplier); +// 使用自定义线程池(推荐) +static CompletableFuture supplyAsync(Supplier supplier, Executor executor); +static CompletableFuture runAsync(Runnable runnable); +// 使用自定义线程池(推荐) +static CompletableFuture runAsync(Runnable runnable, Executor executor); +``` + +`runAsync()` 方法接受的参数是 `Runnable` ,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 `runAsync()` 方法。 + +```java +@FunctionalInterface +public interface Runnable { + public abstract void run(); +} +``` + +`supplyAsync()` 方法接受的参数是 `Supplier` ,这也是一个函数式接口,`U` 是返回结果值的类型。 + +```java +@FunctionalInterface +public interface Supplier { + + /** + * Gets a result. + * + * @return a result + */ + T get(); +} +``` + +当你需要异步操作且关心返回结果的时候,可以使用 `supplyAsync()` 方法。 + +```java +CompletableFuture future = CompletableFuture.runAsync(() -> System.out.println("hello!")); +future.get();// 输出 "hello!" +CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "hello!"); +assertEquals("hello!", future2.get()); +``` + +### 处理异步结算的结果 + +当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个: + +- `thenApply()` +- `thenAccept()` +- `thenRun()` +- `whenComplete()` + +`thenApply()` 方法接受一个 `Function` 实例,用它来处理结果。 + +```java +// 沿用上一个任务的线程池 +public CompletableFuture thenApply( + Function fn) { + return uniApplyStage(null, fn); +} + +//使用默认的 ForkJoinPool 线程池(不推荐) +public CompletableFuture thenApplyAsync( + Function fn) { + return uniApplyStage(defaultExecutor(), fn); +} +// 使用自定义线程池(推荐) +public CompletableFuture thenApplyAsync( + Function fn, Executor executor) { + return uniApplyStage(screenExecutor(executor), fn); +} +``` + +`thenApply()` 方法使用示例如下: + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!"); +assertEquals("hello!world!", future.get()); +// 这次调用将被忽略。 +future.thenApply(s -> s + "nice!"); +assertEquals("hello!world!", future.get()); +``` + +你还可以进行 **流式调用**: + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!"); +assertEquals("hello!world!nice!", future.get()); +``` + +**如果你不需要从回调函数中获取返回结果,可以使用 `thenAccept()` 或者 `thenRun()`。这两个方法的区别在于 `thenRun()` 不能访问异步计算的结果。** + +`thenAccept()` 方法的参数是 `Consumer` 。 + +```java +public CompletableFuture thenAccept(Consumer action) { + return uniAcceptStage(null, action); +} + +public CompletableFuture thenAcceptAsync(Consumer action) { + return uniAcceptStage(defaultExecutor(), action); +} + +public CompletableFuture thenAcceptAsync(Consumer action, + Executor executor) { + return uniAcceptStage(screenExecutor(executor), action); +} +``` + +顾名思义,`Consumer` 属于消费型接口,它可以接收 1 个输入对象然后进行“消费”。 + +```java +@FunctionalInterface +public interface Consumer { + + void accept(T t); + + default Consumer andThen(Consumer after) { + Objects.requireNonNull(after); + return (T t) -> { accept(t); after.accept(t); }; + } +} +``` + +`thenRun()` 的方法是的参数是 `Runnable` 。 + +```java +public CompletableFuture thenRun(Runnable action) { + return uniRunStage(null, action); +} + +public CompletableFuture thenRunAsync(Runnable action) { + return uniRunStage(defaultExecutor(), action); +} + +public CompletableFuture thenRunAsync(Runnable action, + Executor executor) { + return uniRunStage(screenExecutor(executor), action); +} +``` + +`thenAccept()` 和 `thenRun()` 使用示例如下: + +```java +CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice! + +CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello! +``` + +`whenComplete()` 的方法的参数是 `BiConsumer` 。 + +```java +public CompletableFuture whenComplete( + BiConsumer action) { + return uniWhenCompleteStage(null, action); +} + + +public CompletableFuture whenCompleteAsync( + BiConsumer action) { + return uniWhenCompleteStage(defaultExecutor(), action); +} +// 使用自定义线程池(推荐) +public CompletableFuture whenCompleteAsync( + BiConsumer action, Executor executor) { + return uniWhenCompleteStage(screenExecutor(executor), action); +} +``` + +相对于 `Consumer` , `BiConsumer` 可以接收 2 个输入对象然后进行“消费”。 + +```java +@FunctionalInterface +public interface BiConsumer { + void accept(T t, U u); + + default BiConsumer andThen(BiConsumer after) { + Objects.requireNonNull(after); + + return (l, r) -> { + accept(l, r); + after.accept(l, r); + }; + } +} +``` + +`whenComplete()` 使用示例如下: + +```java +CompletableFuture future = CompletableFuture.supplyAsync(() -> "hello!") + .whenComplete((res, ex) -> { + // res 代表返回的结果 + // ex 的类型为 Throwable ,代表抛出的异常 + System.out.println(res); + // 这里没有抛出异常所有为 null + assertNull(ex); + }); +assertEquals("hello!", future.get()); +``` + +### 异常处理 + +你可以通过 `handle()` 方法来处理任务执行过程中可能出现的抛出异常的情况。 + +```java +public CompletableFuture handle( + BiFunction fn) { + return uniHandleStage(null, fn); +} + +public CompletableFuture handleAsync( + BiFunction fn) { + return uniHandleStage(defaultExecutor(), fn); +} + +public CompletableFuture handleAsync( + BiFunction fn, Executor executor) { + return uniHandleStage(screenExecutor(executor), fn); +} +``` + +示例代码如下: + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> { + if (true) { + throw new RuntimeException("Computation error!"); + } + return "hello!"; +}).handle((res, ex) -> { + // res 代表返回的结果 + // ex 的类型为 Throwable ,代表抛出的异常 + return res != null ? res : "world!"; +}); +assertEquals("world!", future.get()); +``` + +你还可以通过 `exceptionally()` 方法来处理异常情况。 + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> { + if (true) { + throw new RuntimeException("Computation error!"); + } + return "hello!"; +}).exceptionally(ex -> { + System.out.println(ex.toString());// CompletionException + return "world!"; +}); +assertEquals("world!", future.get()); +``` + +如果你想让 `CompletableFuture` 的结果就是异常的话,可以使用 `completeExceptionally()` 方法为其赋值。 + +```java +CompletableFuture completableFuture = new CompletableFuture<>(); +// ... +completableFuture.completeExceptionally( + new RuntimeException("Calculation failed!")); +// ... +completableFuture.get(); // ExecutionException +``` + +### 组合 CompletableFuture + +你可以使用 `thenCompose()` 按顺序链接两个 `CompletableFuture` 对象。 + +```java +public CompletableFuture thenCompose( + Function> fn) { + return uniComposeStage(null, fn); +} + +public CompletableFuture thenComposeAsync( + Function> fn) { + return uniComposeStage(defaultExecutor(), fn); +} + +public CompletableFuture thenComposeAsync( + Function> fn, + Executor executor) { + return uniComposeStage(screenExecutor(executor), fn); +} +``` + +`thenCompose()` 方法会使用示例如下: + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> "hello!") + .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!")); +assertEquals("hello!world!", future.get()); +``` + +在实际开发中,这个方法还是非常有用的。比如说,我们先要获取用户信息然后再用用户信息去做其他事情。 + +和 `thenCompose()` 方法类似的还有 `thenCombine()` 方法, `thenCombine()` 同样可以组合两个 `CompletableFuture` 对象。 + +```java +CompletableFuture completableFuture + = CompletableFuture.supplyAsync(() -> "hello!") + .thenCombine(CompletableFuture.supplyAsync( + () -> "world!"), (s1, s2) -> s1 + s2) + .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "nice!")); +assertEquals("hello!world!nice!", completableFuture.get()); +``` + +**那 `thenCompose()` 和 `thenCombine()` 有什么区别呢?** + +- `thenCompose()` 可以两个 `CompletableFuture` 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。 +- `thenCombine()` 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。 + +### 并行运行多个 CompletableFuture + +你可以通过 `CompletableFuture` 的 `allOf()`这个静态方法来并行运行多个 `CompletableFuture` 。 + +实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。 + +比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 `CompletableFuture` 来处理。 + +示例代码如下: + +```java +CompletableFuture task1 = + CompletableFuture.supplyAsync(()->{ + //自定义业务操作 + }); +...... +CompletableFuture task6 = + CompletableFuture.supplyAsync(()->{ + //自定义业务操作 + }); +...... + CompletableFuture headerFuture=CompletableFuture.allOf(task1,.....,task6); + + try { + headerFuture.join(); + } catch (Exception ex) { + ...... + } +System.out.println("all done. "); +``` + +经常和 `allOf()` 方法拿来对比的是 `anyOf()` 方法。 + +**`allOf()` 方法会等到所有的 `CompletableFuture` 都运行完成之后再返回** + +```java +Random rand = new Random(); +CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000 + rand.nextInt(1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("future1 done..."); + } + return "abc"; +}); +CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000 + rand.nextInt(1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("future2 done..."); + } + return "efg"; +}); +``` + +调用 `join()` 可以让程序等`future1` 和 `future2` 都运行完了之后再继续执行。 + +```java +CompletableFuture completableFuture = CompletableFuture.allOf(future1, future2); +completableFuture.join(); +assertTrue(completableFuture.isDone()); +System.out.println("all futures done..."); +``` + +输出: + +```java +future1 done... +future2 done... +all futures done... +``` + +**`anyOf()` 方法不会等待所有的 `CompletableFuture` 都运行完成之后再返回,只要有一个执行完成即可!** + +```java +CompletableFuture f = CompletableFuture.anyOf(future1, future2); +System.out.println(f.get()); +``` + +输出结果可能是: + +```java +future2 done... +efg +``` + +也可能是: + +``` +future1 done... +abc +```