一、核心概念
CompletableFuture 是Java 8引入的异步编程工具,是对 Future 的增强,支持:
- 异步回调:任务完成后自动触发回调
- 任务编排:链式调用,组合多个异步操作
- 异常处理:完善的异常传播机制
- 手动完成:可以主动设置结果
二、与传统Future的对比
1. 传统Future的局限
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "结果";
});
// 问题1:只能阻塞等待结果
String result = future.get(); // 阻塞2秒
// 问题2:无法设置回调
// 问题3:无法组合多个Future
// 问题4:异常处理不友好
2. CompletableFuture的优势
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); } catch (InterruptedException e) {}
return "结果";
});
// 优势1:非阻塞回调
future.thenAccept(result -> {
System.out.println("收到结果:" + result);
});
// 优势2:链式调用
future
.thenApply(r -> r + " 处理1")
.thenApply(r -> r + " 处理2")
.thenAccept(System.out::println);
// 优势3:异常处理
future
.thenApply(this::process)
.exceptionally(ex -> "默认值");
// 优势4:组合多个Future
CompletableFuture<String> future1 = ...;
CompletableFuture<String> future2 = ...;
CompletableFuture.allOf(future1, future2).join();
三、底层数据结构
1. 核心字段
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// 任务结果(成功或异常)
volatile Object result;
// 回调链表头节点
volatile Completion stack;
// 默认异步执行器
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
}
关键点:
result:存储任务结果或异常(AltResult包装)stack:维护一个 Completion链表(栈结构),存储所有等待结果的回调
2. Completion 回调节点
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 链表的下一个节点
abstract CompletableFuture<?> tryFire(int mode); // 触发回调
abstract boolean isLive();
}
// 典型子类
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn; // 转换函数
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<T> d = dep;
CompletableFuture<V> f = this.src;
Object r;
if ((r = f.result) != null) { // 依赖任务已完成
// 执行fn.apply(r),将结果设置到dep
d.completeValue(fn.apply(r));
}
return d;
}
}
Completion类型:
UniApply:thenApply的回调UniAccept:thenAccept的回调UniRun:thenRun的回调BiApply:thenCombine的回调(依赖两个Future)OrApply:applyToEither的回调(两个Future之一完成即触发)
3. 回调链表结构
// 示例:future.thenApply(f1).thenApply(f2).thenAccept(f3)
CompletableFuture (原始)
↓
result = null
stack → UniApply(f1) → UniApply(f2) → UniAccept(f3) → null
↓ ↓ ↓
新Future1 新Future2 新Future3
// 当原始Future完成时,依次触发stack链表中的Completion
四、核心方法实现原理
1. supplyAsync - 异步执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
CompletableFuture<U> d = new CompletableFuture<>();
e.execute(new AsyncSupply<U>(d, f)); // 提交到线程池
return d;
}
// AsyncSupply任务
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep;
Supplier<T> fn;
public void run() {
try {
T result = fn.get(); // 执行supplier
dep.completeValue(result); // 设置结果并触发回调链
} catch (Throwable ex) {
dep.completeThrowable(ex); // 设置异常
}
}
}
流程:
- 创建一个新的
CompletableFuture对象 - 将
Supplier包装成AsyncSupply任务 - 提交到线程池(默认ForkJoinPool.commonPool)
- 任务执行完成后,调用
completeValue设置结果并触发回调
2. thenApply - 转换结果
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn); // null表示在完成线程中同步执行
}
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn); // 异步执行
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
CompletableFuture<V> d = new CompletableFuture<>(); // 新Future
// 情况1:当前Future已完成
if (result != null) {
if (e != null) {
e.execute(() -> d.completeValue(f.apply(result))); // 异步执行
} else {
d.completeValue(f.apply(result)); // 同步执行
}
}
// 情况2:当前Future未完成,添加回调到stack
else {
UniApply<T,V> c = new UniApply<>(e, d, this, f);
push(c); // 添加到回调链表
c.tryFire(SYNC); // 尝试立即执行(可能已完成)
}
return d;
}
// 推入回调栈
final void push(Completion c) {
do {
c.next = stack; // CAS操作
} while (!UNSAFE.compareAndSwapObject(this, STACK, c.next, c));
}
关键点:
- 如果当前Future 已完成,直接执行转换函数
- 如果当前Future 未完成,创建
UniApply回调节点,推入stack链表 - 使用CAS操作保证线程安全(无锁)
3. complete - 设置结果并触发回调链
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete(); // 触发回调链
return triggered;
}
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t); // CAS设置结果
}
// 触发回调链
final void postComplete() {
CompletableFuture<?> f = this;
Completion h;
// 遍历stack链表,依次触发回调
while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d;
Completion t;
// CAS移除头节点
if (UNSAFE.compareAndSwapObject(f, STACK, h, t = h.next)) {
if (t != null) {
// 将next节点也加入栈(优化)
UNSAFE.compareAndSwapObject(h, NEXT, t, null);
}
f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发回调
}
}
}
流程:
- 使用CAS设置
result字段 - 遍历
stack链表,依次触发每个Completion - 每个
Completion的tryFire方法会:- 获取依赖的Future的结果
- 执行转换/消费函数
- 设置新Future的结果
- 递归触发下游回调
4. thenCombine - 组合两个Future
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o, BiFunction<T,U,V> f) {
CompletableFuture<V> d = new CompletableFuture<>();
CompletableFuture<U> other = o.toCompletableFuture();
// 情况1:两个Future都已完成
if (result != null && other.result != null) {
d.completeValue(f.apply(result, other.result));
}
// 情况2:至少一个未完成,创建BiApply回调
else {
BiApply<T,U,V> c = new BiApply<>(e, d, this, other, f);
// 同时添加到两个Future的回调链
this.push(c);
other.push(c);
c.tryFire(SYNC);
}
return d;
}
// BiApply回调节点
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
BiFunction<? super T,? super U,? extends V> fn;
final CompletableFuture<V> tryFire(int mode) {
Object r, s;
// 必须两个依赖都完成
if ((r = src.result) != null && (s = snd.result) != null) {
V result = fn.apply(r, s); // 执行组合函数
dep.completeValue(result);
}
return dep;
}
}
关键点:
BiApply会被同时添加到两个Future的回调链- 任一Future完成时,都会尝试触发
tryFire - 只有当两个Future 都完成 时,才真正执行组合函数
5. exceptionally - 异常处理
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
private CompletableFuture<T> uniExceptionallyStage(Function<Throwable, ? extends T> f) {
CompletableFuture<T> d = new CompletableFuture<>();
// 已完成
if (result != null) {
if (result instanceof AltResult) { // 异常结果
Throwable ex = ((AltResult) result).ex;
try {
d.completeValue(f.apply(ex)); // 执行异常处理函数
} catch (Throwable ex2) {
d.completeThrowable(ex2);
}
} else {
d.completeValue(result); // 正常结果,直接传递
}
}
// 未完成,添加回调
else {
UniExceptionally<T> c = new UniExceptionally<>(d, this, f);
push(c);
}
return d;
}
// 异常结果包装类
static final class AltResult {
final Throwable ex; // 存储异常
}
异常传播机制:
- 任务执行出现异常时,包装成
AltResult存储在result - 后续的
thenApply、thenAccept等不会执行,直接传递异常 - 遇到
exceptionally或handle时,才处理异常
五、线程模型
1. 默认执行器
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// 备用执行器(单核机器)
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); // 每个任务一个线程
}
}
执行模式:
supplyAsync()/thenApplyAsync():使用asyncPool执行thenApply()/thenAccept():在 完成任务的线程 中同步执行- 可以自定义
Executor
2. 执行模式对比
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Task: " + Thread.currentThread().getName());
return "result";
});
// 同步模式:在完成future的线程中执行(可能是ForkJoinPool线程)
future.thenApply(r -> {
System.out.println("ThenApply: " + Thread.currentThread().getName());
return r.toUpperCase();
});
// 异步模式:提交到线程池异步执行
future.thenApplyAsync(r -> {
System.out.println("ThenApplyAsync: " + Thread.currentThread().getName());
return r.toUpperCase();
});
// 自定义执行器
Executor customExecutor = Executors.newFixedThreadPool(5);
future.thenApplyAsync(r -> {
System.out.println("Custom: " + Thread.currentThread().getName());
return r.toUpperCase();
}, customExecutor);
六、关键实现技巧
1. CAS无锁操作
// 设置结果
UNSAFE.compareAndSwapObject(this, RESULT, null, value)
// 修改回调栈
UNSAFE.compareAndSwapObject(this, STACK, c.next, c)
优势:
- 避免加锁,提升性能
- 多线程安全地修改
result和stack
2. 栈式回调链(LIFO)
// 添加回调时:后添加的在栈顶
push(completion1); // stack → completion1 → null
push(completion2); // stack → completion2 → completion1 → null
// 触发回调时:从栈顶开始(LIFO)
postComplete(); // 先触发completion2,再触发completion1
原因:
- 简化实现(栈的push/pop比队列简单)
- 回调顺序对大多数场景无影响
3. 复用ForkJoinTask
abstract static class Completion extends ForkJoinTask<Void> {
// Completion继承ForkJoinTask
}
优势:
- 可以直接提交到
ForkJoinPool执行 - 复用
ForkJoinTask的调度机制
七、使用示例
1. 基本使用
// 异步计算
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
// 转换结果
CompletableFuture<String> result = future
.thenApply(num -> num * 2) // 200
.thenApply(num -> "结果: " + num); // "结果: 200"
// 消费结果
result.thenAccept(System.out::println);
2. 组合多个Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
// 等待两个都完成
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
return s1 + " " + s2; // "Hello World"
});
// 任一完成即返回
CompletableFuture<String> either = future1.applyToEither(future2, s -> s);
3. 异常处理
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("出错了");
}
return "成功";
})
.thenApply(s -> s + " 处理")
.exceptionally(ex -> {
System.out.println("捕获异常:" + ex.getMessage());
return "默认值";
})
.thenAccept(System.out::println);
4. 批量任务
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> httpClient.get(url)))
.collect(Collectors.toList());
// 等待所有完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.thenRun(() -> {
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("所有请求完成:" + results);
});
八、面试答题总结
简洁版回答:
CompletableFuture 的底层实现基于以下核心机制:
1. 数据结构
result:存储任务结果或异常(使用AltResult包装异常)stack:维护一个 Completion回调链表(栈结构),存储所有等待结果的回调
2. 回调机制
- 调用
thenApply等方法时,创建对应的Completion节点(如UniApply) - 使用 CAS操作 将节点推入
stack链表(无锁) - 任务完成时,遍历
stack链表,依次触发每个Completion的tryFire方法
3. 线程模型
- 默认使用
ForkJoinPool.commonPool()作为执行器 supplyAsync/thenApplyAsync:异步执行(提交到线程池)thenApply/thenAccept:同步执行(在完成任务的线程中执行)
4. 核心流程
提交任务 → 执行 → 设置结果(CAS) → 触发回调链(postComplete)
↓
遍历stack,依次执行Completion
↓
每个Completion触发下游回调
5. 关键技术
- CAS无锁操作:保证
result和stack的线程安全 - 栈式回调链:简化实现,LIFO顺序触发
- 复用ForkJoinTask:
Completion继承ForkJoinTask,复用调度机制
6. 异常处理
- 异常包装成
AltResult存储 - 异常会沿着回调链传播,直到遇到
exceptionally或handle处理
典型应用:异步任务编排、并行处理、非阻塞IO等场景。