一、什么是多线程编排
多线程编排指的是协调多个异步任务的执行顺序和依赖关系,常见场景包括:
- 串行执行:任务A → 任务B → 任务C
- 并行执行后汇总:任务A、B、C并行,等待全部完成后汇总
- 任一完成即返回:任务A、B、C并行,任一完成即可
- 复杂依赖:任务D依赖A和B,任务E依赖C和D
二、方案一:CompletableFuture(推荐)
1. 串行执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
return "结果A";
})
.thenApplyAsync(resultA -> {
System.out.println("执行任务B,依赖:" + resultA);
return "结果B";
})
.thenApplyAsync(resultB -> {
System.out.println("执行任务C,依赖:" + resultB);
return "结果C";
});
String finalResult = future.join(); // 等待完成
System.out.println("最终结果:" + finalResult);
输出:
执行任务A
执行任务B,依赖:结果A
执行任务C,依赖:结果B
最终结果:结果C
2. 并行执行后汇总
方案1:allOf + 收集结果
// 创建多个异步任务
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务1结果";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "任务2结果";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
sleep(800);
return "任务3结果";
});
// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(task1, task2, task3);
// 汇总结果
CompletableFuture<List<String>> allResults = allOf.thenApply(v -> {
return Arrays.asList(
task1.join(),
task2.join(),
task3.join()
);
});
List<String> results = allResults.join();
System.out.println("所有结果:" + results);
方案2:thenCombine(两个任务组合)
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");
// 组合两个任务的结果
CompletableFuture<String> combined = task1.thenCombine(task2, (r1, r2) -> {
return r1 + " + " + r2;
});
System.out.println(combined.join()); // "任务1 + 任务2"
方案3:多任务组合(自定义工具)
public class CompletableFutureUtil {
/**
* 并行执行多个任务并收集结果
*/
public static <T> CompletableFuture<List<T>> sequence(
List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
return allDone.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
}
// 使用
List<CompletableFuture<String>> tasks = Arrays.asList(
CompletableFuture.supplyAsync(() -> "任务1"),
CompletableFuture.supplyAsync(() -> "任务2"),
CompletableFuture.supplyAsync(() -> "任务3")
);
List<String> results = CompletableFutureUtil.sequence(tasks).join();
System.out.println(results);
3. 任一完成即返回
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "任务1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "任务3";
});
// 任一完成即返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task1, task2, task3);
System.out.println("最快完成的:" + anyOf.join()); // "任务2"
应用场景:
- 多个数据源查询,取最快返回的
- 多个算法并行计算,取最快的结果
4. 复杂依赖编排
/**
* 场景:电商下单流程
* 1. 并行查询:用户信息、商品信息
* 2. 依赖1:计算价格(需要商品信息)
* 3. 依赖1+2:生成订单(需要用户信息和价格)
* 4. 依赖3:扣减库存(需要订单信息)
*/
public CompletableFuture<Order> createOrder(Long userId, Long productId) {
// 任务1:查询用户信息
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> {
return userService.getUser(userId);
});
// 任务2:查询商品信息
CompletableFuture<Product> productFuture = CompletableFuture.supplyAsync(() -> {
return productService.getProduct(productId);
});
// 任务3:计算价格(依赖商品信息)
CompletableFuture<BigDecimal> priceFuture = productFuture.thenApply(product -> {
return priceService.calculatePrice(product);
});
// 任务4:生成订单(依赖用户信息和价格)
CompletableFuture<Order> orderFuture = userFuture
.thenCombine(priceFuture, (user, price) -> {
Order order = new Order();
order.setUserId(user.getId());
order.setPrice(price);
return orderRepository.save(order);
});
// 任务5:扣减库存(依赖订单)
CompletableFuture<Order> finalFuture = orderFuture.thenApply(order -> {
inventoryService.deduct(productId, 1);
return order;
});
return finalFuture;
}
// 使用
createOrder(123L, 456L)
.thenAccept(order -> System.out.println("订单创建成功:" + order))
.exceptionally(ex -> {
System.out.println("订单创建失败:" + ex.getMessage());
return null;
});
5. 异常处理与重试
public CompletableFuture<String> robustTask() {
return CompletableFuture.supplyAsync(() -> {
// 可能失败的任务
if (Math.random() > 0.7) {
throw new RuntimeException("任务失败");
}
return "成功";
})
.exceptionally(ex -> {
// 异常处理
log.error("任务失败,使用降级逻辑", ex);
return "降级结果";
})
.thenApply(result -> {
// 后续处理
return "处理后的" + result;
});
}
// 重试机制
public CompletableFuture<String> retryTask(int maxRetries) {
return CompletableFuture.supplyAsync(() -> {
return executeWithRetry(maxRetries);
});
}
private String executeWithRetry(int maxRetries) {
for (int i = 0; i < maxRetries; i++) {
try {
return doTask();
} catch (Exception e) {
if (i == maxRetries - 1) {
throw e;
}
sleep(1000 * (i + 1)); // 指数退避
}
}
throw new RuntimeException("重试失败");
}
三、方案二:CountDownLatch
适用于 主线程等待多个子线程完成 的场景。
基本用法
public void processWithCountDownLatch() throws InterruptedException {
int taskCount = 3;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(3);
// 任务1
executor.execute(() -> {
try {
System.out.println("任务1执行");
Thread.sleep(1000);
} finally {
latch.countDown(); // 计数器-1
}
});
// 任务2
executor.execute(() -> {
try {
System.out.println("任务2执行");
Thread.sleep(1500);
} finally {
latch.countDown();
}
});
// 任务3
executor.execute(() -> {
try {
System.out.println("任务3执行");
Thread.sleep(800);
} finally {
latch.countDown();
}
});
// 等待所有任务完成
latch.await(); // 阻塞直到计数器归零
System.out.println("所有任务完成");
executor.shutdown();
}
带超时的等待
boolean finished = latch.await(5, TimeUnit.SECONDS);
if (!finished) {
System.out.println("超时,部分任务未完成");
}
优点:
- 简单直观
- 可以设置超时
缺点:
- 无法获取任务结果
- 无法处理任务异常
- 一次性使用,不可重置
四、方案三:CyclicBarrier
适用于 多个线程相互等待,达到同步点后一起继续 的场景。
基本用法
public void processWithCyclicBarrier() {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程到达屏障,开始汇总");
});
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
executor.execute(() -> {
try {
System.out.println("任务" + taskId + "准备阶段");
Thread.sleep(1000);
// 等待其他线程到达
barrier.await();
System.out.println("任务" + taskId + "继续执行");
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
多阶段任务
public void multiPhaseTask() throws Exception {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
executor.execute(() -> {
try {
// 阶段1
System.out.println("任务" + taskId + " - 阶段1");
barrier.await(); // 等待所有线程完成阶段1
// 阶段2
System.out.println("任务" + taskId + " - 阶段2");
barrier.await(); // 等待所有线程完成阶段2
// 阶段3
System.out.println("任务" + taskId + " - 阶段3");
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
优点:
- 可重复使用(CyclicBarrier可以reset)
- 支持回调(barrierAction)
缺点:
- 必须固定数量的线程
- 无法获取任务结果
五、方案四:Phaser(Java 7+)
Phaser是CyclicBarrier和CountDownLatch的增强版,支持动态调整参与线程数。
基本用法
public void processWithPhaser() {
Phaser phaser = new Phaser(1); // 主线程注册
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
phaser.register(); // 动态注册
executor.execute(() -> {
try {
System.out.println("任务" + taskId + "执行");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
phaser.arriveAndDeregister(); // 完成并注销
}
});
}
phaser.arriveAndAwaitAdvance(); // 等待所有任务完成
System.out.println("所有任务完成");
executor.shutdown();
}
多阶段任务
public void multiPhaseWithPhaser() {
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("阶段" + phase + "完成,参与者:" + registeredParties);
return phase >= 2; // 阶段2后终止
}
};
phaser.register(); // 主线程注册
for (int i = 0; i < 3; i++) {
final int taskId = i;
phaser.register();
new Thread(() -> {
// 阶段0
System.out.println("任务" + taskId + " - 阶段0");
phaser.arriveAndAwaitAdvance();
// 阶段1
System.out.println("任务" + taskId + " - 阶段1");
phaser.arriveAndAwaitAdvance();
// 阶段2
System.out.println("任务" + taskId + " - 阶段2");
phaser.arriveAndAwaitAdvance();
}).start();
}
phaser.arriveAndDeregister(); // 主线程完成
}
六、方案五:Future + 队列
适用于 生产者-消费者 模型。
public void processWithFuture() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<String>> futures = new ArrayList<>();
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "任务" + taskId + "结果";
});
futures.add(future);
}
// 收集结果
for (Future<String> future : futures) {
String result = future.get(); // 阻塞等待
System.out.println(result);
}
executor.shutdown();
}
七、实战案例:并行查询多个数据源
@Service
public class DataAggregationService {
@Autowired
private UserService userService;
@Autowired
private OrderService orderService;
@Autowired
private ProductService productService;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
/**
* 聚合用户的完整信息
*/
public UserDetail aggregateUserDetail(Long userId) {
// 并行查询多个数据源
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
() -> userService.getUser(userId), executor
);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
() -> orderService.getUserOrders(userId), executor
);
CompletableFuture<List<Product>> favoritesFuture = CompletableFuture.supplyAsync(
() -> productService.getUserFavorites(userId), executor
);
CompletableFuture<Integer> pointsFuture = CompletableFuture.supplyAsync(
() -> userService.getUserPoints(userId), executor
);
// 等待所有查询完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(
userFuture, ordersFuture, favoritesFuture, pointsFuture
);
// 汇总结果
return allOf.thenApply(v -> {
UserDetail detail = new UserDetail();
detail.setUser(userFuture.join());
detail.setOrders(ordersFuture.join());
detail.setFavorites(favoritesFuture.join());
detail.setPoints(pointsFuture.join());
return detail;
}).join(); // 阻塞等待最终结果
}
}
八、方案对比
| 方案 | 适用场景 | 可获取结果 | 可处理异常 | 复杂度 | 推荐度 |
|---|---|---|---|---|---|
| CompletableFuture | 所有异步编排场景 | ✅ | ✅ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| CountDownLatch | 主线程等待多个子线程 | ❌ | ❌ | ⭐ | ⭐⭐⭐ |
| CyclicBarrier | 多线程相互等待 | ❌ | ❌ | ⭐⭐ | ⭐⭐⭐ |
| Phaser | 动态参与者的多阶段任务 | ❌ | ❌ | ⭐⭐⭐ | ⭐⭐ |
| Future | 简单异步任务 | ✅ | ⚠️ | ⭐ | ⭐⭐ |
九、面试答题总结
简洁版回答:
Java中多线程编排有以下几种主流方案:
1. CompletableFuture(推荐)
- 串行:
thenApply、thenCompose - 并行汇总:
allOf+ 收集结果 - 任一完成:
anyOf - 组合:
thenCombine(两个任务) - 支持异常处理(
exceptionally)和自定义执行器
2. CountDownLatch
- 主线程等待多个子线程完成
- 使用
countDown()递减,await()等待 - 一次性使用,无法获取结果
3. CyclicBarrier
- 多个线程相互等待,到达同步点后一起继续
- 可重复使用,支持回调
- 适合多阶段任务
4. Phaser
- CyclicBarrier的增强版,支持动态调整参与者
- 适合复杂的多阶段协调
选择建议:
- 优先使用 CompletableFuture:功能最强大,支持结果获取和异常处理
- 简单等待场景:CountDownLatch
- 多阶段协调:CyclicBarrier或Phaser
- 复杂依赖关系:CompletableFuture的组合方法
典型应用:电商下单流程、数据聚合查询、批量任务处理等。