CompletableFuture 学习笔记
文章链接:https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
利用java8的CompletableFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.util.concurrent.CompletableFuture; public class Main { public static void main(String[] args) throws InterruptedException { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行步骤1 "); return "Step 1"; });
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行步骤2 "); return "Step 2"; });
cf1.thenCombine(cf2, (res1, res2) -> { System.out.println("结果1: " + res1 + "结果2: " + res2); System.out.println("执行步骤3"); return "Step 3"; }).thenAccept((res3) -> { System.out.println("第三步结果: " + res3); }); } }
|
阅读文章之后顺手跑了跑这个demo,输出结果如下:
1 2 3 4 5
| 执行步骤1 执行步骤2 结果1: Step 1结果2: Step 2 执行步骤3 第三步结果: Step 3
|
这个demo大致描述了一个这样的调用逻辑
graph TD
A[开始] --> B[步骤1]
A --> C[步骤2]
B --> D[步骤3]
C --> D
下边是一些CompletableFuture的常用一些组合任务的方式:
thenApply,同步组合结果
1 2 3
| CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "ABC").thenApply(String::toLowerCase);
System.out.println(completableFuture.get());
|
输出
这个主要用于对异步结果做简单处理。
2. thenAccept消费结果
1 2 3 4
| CompletableFuture .supplyAsync(() -> "ABC") .thenApply(e -> e+"1") .thenAccept(System.out::println);
|
输出
thenCompose将两个任务串联
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("step 1"); try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } return "123"; });
CompletableFuture<String> cf2 = cf1.thenCompose(res -> { System.out.println("res1: " + res); System.out.println("step 2"); return CompletableFuture.supplyAsync(() -> "456"); });
String finalans = cf2.join(); System.out.println("finalans: " + finalans); }
|
1 2 3 4
| step 1 res1: 123 step 2 finalans: 456
|
thenCombine并行执行,和最开始的demo是一样的
allOf所有任务都完成就执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static void main(String[] args) { CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("执行任务1"); try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } return 123; });
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("执行任务2"); try { Thread.sleep(2000L); } catch (InterruptedException e) { throw new RuntimeException(e); } return 456; });
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> { System.out.println("执行任务3"); try { Thread.sleep(3000L); } catch (InterruptedException e) { throw new RuntimeException(e); } return 789; }); long begin = System.currentTimeMillis(); CompletableFuture.allOf(cf1, cf2, cf3).thenAccept(System.out::println).join(); long end = System.currentTimeMillis(); System.out.println(end - begin); }
|
看到最后输出大致是3000左右,确实是等待了所有任务完成才结束的。
6. anyOf任意任务完成就执行
代码与allOf demo是一样,只需要换一下方法就可以。这样子的输出就是1000左右的millsecond。
现在有一个想法,是手动建立异步调用链,最后再触发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import java.util.concurrent.CompletableFuture;
import static java.lang.Thread.sleep;
public class ZeroDependency { public static void main(String[] args) { CompletableFuture<Object> trigger = new CompletableFuture<>();
CompletableFuture<Void> chain = trigger .thenCompose(v -> CompletableFuture.supplyAsync(() -> { System.out.println("run task1"); try { sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "123"; })) .thenCompose((res1) -> { System.out.println("task1 结果: " + res1); return CompletableFuture.runAsync(() -> { System.out.println("run task2"); try { sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); }) .thenRun(() -> { System.out.println("All done!"); }); trigger.complete(null); chain.join(); } }
|
看起来不太友好。。
再来看一看CompleteFuture的源码,它的内部保存了一个Completion栈
1 2 3 4 5 6 7 8
| volatile Object result; volatile Completion stack;
abstract static class Completion { volatile Completion next; }
|
至于文章中提到的如何保证并发安全,可以直接去查看一下thenApply()中调用的uniApplyStage()方法。
1 2 3 4 5 6 7 8 9 10 11
| private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
|
这里首先进行了push(c),会不断检查result尝试将c压栈直到成功。
1 2 3 4 5 6
| final void push(UniCompletion<?,?> c) { if (c != null) { while (result == null && !tryPushStack(c)) lazySetNext(c, null); } }
|
假设在压栈之前,当前这个c的依赖已经执行完成了,那么后续会在tryFire()方法中尝试执行。
一些小知识
直接从文章中复制的
同步方法(即不带Async后缀的方法)有两种情况。
- 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
- 如果注册时被依赖的操作还未执行完,则由回调线程执行。
异步方法(即带Async后缀的方法):
可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。
异步回调要传线程池
前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。
当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。
异步RPC调用注意不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。