completable-future学习笔记

CompletableFuture 学习笔记

文章链接:https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html

利用java8的CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行步骤1 ");
return "Step 1";
});

CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行步骤2 ");
return "Step 2";
});

cf1.thenCombine(cf2, (res1, res2) -> {
System.out.println("结果1: " + res1 + "结果2: " + res2);
System.out.println("执行步骤3");
return "Step 3";
}).thenAccept((res3) -> {
System.out.println("第三步结果: " + res3);
});
}
}

阅读文章之后顺手跑了跑这个demo,输出结果如下:

1
2
3
4
5
执行步骤1 
执行步骤2
结果1: Step 1结果2: Step 2
执行步骤3
第三步结果: Step 3

这个demo大致描述了一个这样的调用逻辑

graph TD
    A[开始] --> B[步骤1]
    A --> C[步骤2]
    B --> D[步骤3]
    C --> D

下边是一些CompletableFuture的常用一些组合任务的方式:

  1. thenApply,同步组合结果
1
2
3
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "ABC").thenApply(String::toLowerCase);

System.out.println(completableFuture.get());

输出

1
abc

这个主要用于对异步结果做简单处理。
2. thenAccept消费结果

1
2
3
4
CompletableFuture
.supplyAsync(() -> "ABC")
.thenApply(e -> e+"1")
.thenAccept(System.out::println);

输出

1
ABC1
  1. thenCompose将两个任务串联
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("step 1");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "123";
});

CompletableFuture<String> cf2 = cf1.thenCompose(res -> {
System.out.println("res1: " + res);
System.out.println("step 2");
return CompletableFuture.supplyAsync(() -> "456");
});

String finalans = cf2.join();
System.out.println("finalans: " + finalans);
}
1
2
3
4
step 1
res1: 123
step 2
finalans: 456
  1. thenCombine并行执行,和最开始的demo是一样的
  2. allOf所有任务都完成就执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务1");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 123;
});

CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务2");
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 456;
});

CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务3");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 789;
});
//等待所有任务完成
long begin = System.currentTimeMillis();
CompletableFuture.allOf(cf1, cf2, cf3).thenAccept(System.out::println).join();
long end = System.currentTimeMillis();

System.out.println(end - begin);
}

看到最后输出大致是3000左右,确实是等待了所有任务完成才结束的。
6. anyOf任意任务完成就执行
代码与allOf demo是一样,只需要换一下方法就可以。这样子的输出就是1000左右的millsecond。

现在有一个想法,是手动建立异步调用链,最后再触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.CompletableFuture;

import static java.lang.Thread.sleep;

public class ZeroDependency {
public static void main(String[] args) {
CompletableFuture<Object> trigger = new CompletableFuture<>();

CompletableFuture<Void> chain = trigger
.thenCompose(v ->
CompletableFuture.supplyAsync(() -> {
System.out.println("run task1");
try {
sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "123";
}))
.thenCompose((res1) -> {
System.out.println("task1 结果: " + res1);
//这里不需要下一个节点来执行任务了,所以用的是runAsync而不是supplyAsync
return CompletableFuture.runAsync(() -> {
System.out.println("run task2");
try {
sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
})
.thenRun(() -> {
System.out.println("All done!");
});
//手动触发异步链条
trigger.complete(null);
//等待chain执行完成,**不要等待trigger,等待trigger会提前结束
chain.join();
}
}

看起来不太友好。。

再来看一看CompleteFuture的源码,它的内部保存了一个Completion

1
2
3
4
5
6
7
8
volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions

//这里是一个链栈
abstract static class Completion {
volatile Completion next; // Next dependent action
//other code...
}

至于文章中提到的如何保证并发安全,可以直接去查看一下thenApply()中调用的uniApplyStage()方法。

1
2
3
4
5
6
7
8
9
10
11
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

这里首先进行了push(c),会不断检查result尝试将c压栈直到成功。

1
2
3
4
5
6
final void push(UniCompletion<?,?> c) {
if (c != null) {
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}

假设在压栈之前,当前这个c的依赖已经执行完成了,那么后续会在tryFire()方法中尝试执行。

一些小知识

直接从文章中复制的

同步方法(即不带Async后缀的方法)有两种情况。

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

异步方法(即带Async后缀的方法):

可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。

异步回调要传线程池

前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

异步RPC调用注意不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。


completable-future学习笔记
http://example.com/2026/01/22/completable-future/
作者
Soya
发布于
2026年1月22日
许可协议