一、问题分析
线程池的设计初衷是并发执行任务以提高性能,但某些业务场景需要保证任务的顺序执行:
- 数据库事务操作有先后依赖
- 状态机流转必须按顺序
- 日志记录需要保持时序
- 消息处理需要保证顺序性
二、方案一:单线程线程池(最简单)
1. 使用 newSingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> System.out.println("任务1"));
executor.submit(() -> System.out.println("任务2"));
executor.submit(() -> System.out.println("任务3"));
// 输出保证是:任务1 -> 任务2 -> 任务3
原理:
- 只有一个工作线程
- 任务放入队列后按提交顺序执行
- 保证任务串行执行
2. 手动创建单线程池(推荐)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, // corePoolSize = 1
1, // maximumPoolSize = 1
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000), // 有界队列
new ThreadFactoryBuilder()
.setNameFormat("sequential-task-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 任务提交
executor.execute(() -> process("任务1"));
executor.execute(() -> process("任务2"));
executor.execute(() -> process("任务3"));
优点:
- 实现简单,逻辑清晰
- 天然保证顺序性
- 适合任务量不大的场景
缺点:
- 无法利用多核优势
- 吞吐量低
- 一个任务阻塞会影响后续所有任务
三、方案二:按业务维度分组顺序执行
对于需要部分顺序、部分并发的场景(如订单处理:同一用户的订单需要顺序,不同用户可以并发)。
实现方式:哈希分片 + 多个单线程池
public class PartitionedSequentialExecutor {
private final ThreadPoolExecutor[] executors;
private final int partitionCount;
public PartitionedSequentialExecutor(int partitionCount) {
this.partitionCount = partitionCount;
this.executors = new ThreadPoolExecutor[partitionCount];
for (int i = 0; i < partitionCount; i++) {
executors[i] = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(500),
new ThreadFactoryBuilder()
.setNameFormat("partition-" + i + "-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
/**
* 根据key分配到固定的线程池执行
* 同一个key的任务保证顺序执行
*/
public void execute(String key, Runnable task) {
int partition = Math.abs(key.hashCode()) % partitionCount;
executors[partition].execute(task);
}
public void shutdown() {
for (ThreadPoolExecutor executor : executors) {
executor.shutdown();
}
}
}
// 使用示例
PartitionedSequentialExecutor executor = new PartitionedSequentialExecutor(8);
// 同一用户的订单保证顺序
executor.execute("user123", () -> processOrder("订单1"));
executor.execute("user123", () -> processOrder("订单2"));
executor.execute("user123", () -> processOrder("订单3"));
// 不同用户的订单可以并发
executor.execute("user456", () -> processOrder("订单A"));
executor.execute("user789", () -> processOrder("订单B"));
适用场景:
- 同一用户的操作需要顺序(如订单、账户操作)
- 消息队列的顺序消费(按分区消费)
- 数据库分片场景的事务处理
优点:
- 兼顾顺序性和并发性
- 充分利用多核CPU
- 性能优于单线程池
四、方案三:使用 Future 串联任务
通过 Future.get() 阻塞等待前一个任务完成。
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<?> future1 = executor.submit(() -> {
System.out.println("执行任务1");
Thread.sleep(1000);
return "结果1";
});
// 等待任务1完成
future1.get();
Future<?> future2 = executor.submit(() -> {
System.out.println("执行任务2,使用前一个任务的结果");
return "结果2";
});
future2.get();
Future<?> future3 = executor.submit(() -> {
System.out.println("执行任务3");
return "结果3";
});
future3.get();
缺点:
- 阻塞主线程,没有充分利用异步优势
- 代码不优雅
五、方案四:使用 CompletableFuture 链式调用
利用 CompletableFuture 的 thenCompose、thenApply 等方法实现任务编排。
1. 基本串行执行
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1");
return "结果1";
}, executor)
.thenApplyAsync(result -> {
System.out.println("任务2,使用:" + result);
return "结果2";
}, executor)
.thenApplyAsync(result -> {
System.out.println("任务3,使用:" + result);
return "结果3";
}, executor)
.thenAccept(result -> {
System.out.println("最终结果:" + result);
});
// 不阻塞主线程
Thread.sleep(5000);
2. 复杂依赖编排
// 任务A
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
return "A结果";
}, executor);
// 任务B,依赖任务A
CompletableFuture<String> taskB = taskA.thenApplyAsync(aResult -> {
System.out.println("执行任务B,使用:" + aResult);
return "B结果";
}, executor);
// 任务C,依赖任务B
CompletableFuture<String> taskC = taskB.thenApplyAsync(bResult -> {
System.out.println("执行任务C,使用:" + bResult);
return "C结果";
}, executor);
// 等待最终结果
String finalResult = taskC.get();
3. 有序批量处理
public class SequentialProcessor {
private final ExecutorService executor;
public SequentialProcessor(ExecutorService executor) {
this.executor = executor;
}
/**
* 顺序执行一批任务
*/
public CompletableFuture<Void> executeSequentially(List<Runnable> tasks) {
CompletableFuture<Void> chain = CompletableFuture.completedFuture(null);
for (Runnable task : tasks) {
chain = chain.thenRunAsync(task, executor);
}
return chain;
}
}
// 使用示例
List<Runnable> tasks = Arrays.asList(
() -> System.out.println("任务1"),
() -> System.out.println("任务2"),
() -> System.out.println("任务3")
);
SequentialProcessor processor = new SequentialProcessor(executor);
processor.executeSequentially(tasks).join(); // 等待全部完成
优点:
- 异步非阻塞,不阻塞主线程
- 代码优雅,链式调用
- 支持复杂的任务依赖关系
- 异常处理机制完善(
exceptionally、handle)
六、方案五:使用 BlockingQueue 手动控制
自己实现一个顺序消费者。
public class SequentialTaskExecutor {
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private volatile boolean running = true;
public SequentialTaskExecutor() {
// 启动消费线程
executor.execute(() -> {
while (running || !taskQueue.isEmpty()) {
try {
Runnable task = taskQueue.poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
public void submit(Runnable task) {
taskQueue.offer(task);
}
public void shutdown() {
running = false;
executor.shutdown();
}
}
// 使用示例
SequentialTaskExecutor executor = new SequentialTaskExecutor();
executor.submit(() -> System.out.println("任务1"));
executor.submit(() -> System.out.println("任务2"));
executor.submit(() -> System.out.println("任务3"));
Thread.sleep(2000);
executor.shutdown();
七、方案六:使用 Guava 的 SerialExecutor
Google Guava库提供了一个内部类 SerialExecutor。
// Guava的实现思路
public class SerialExecutor implements Executor {
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final Executor executor;
private Runnable active;
public SerialExecutor(Executor executor) {
this.executor = executor;
}
@Override
public synchronized void execute(Runnable r) {
tasks.add(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
scheduleNext();
}
}
private synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
// 使用示例
Executor threadPool = Executors.newFixedThreadPool(10);
SerialExecutor serialExecutor = new SerialExecutor(threadPool);
serialExecutor.execute(() -> System.out.println("任务1"));
serialExecutor.execute(() -> System.out.println("任务2"));
serialExecutor.execute(() -> System.out.println("任务3"));
原理:
- 任务提交到队列
- 同一时刻只有一个任务在底层线程池中执行
- 任务完成后自动调度下一个
八、方案对比
| 方案 | 顺序性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 单线程池 | ✅ 完全顺序 | ⭐ | ⭐ 简单 | 任务量小,简单场景 |
| 分区线程池 | ✅ 分组顺序 | ⭐⭐⭐⭐ | ⭐⭐⭐ 中等 | 按业务分组顺序(如用户维度) |
| Future串联 | ✅ 完全顺序 | ⭐⭐ | ⭐⭐ 简单 | 阻塞式顺序,不推荐 |
| CompletableFuture | ✅ 完全顺序 | ⭐⭐⭐⭐ | ⭐⭐⭐ 中等 | 异步编排,推荐 |
| BlockingQueue | ✅ 完全顺序 | ⭐⭐⭐ | ⭐⭐⭐⭐ 复杂 | 自定义控制需求 |
| SerialExecutor | ✅ 完全顺序 | ⭐⭐⭐ | ⭐⭐⭐ 中等 | 复用现有线程池 |
九、实战案例:订单顺序处理
@Service
public class OrderProcessService {
// 按用户ID分区,保证同一用户的订单顺序处理
private final PartitionedSequentialExecutor executor
= new PartitionedSequentialExecutor(16);
/**
* 处理订单(异步)
*/
public void processOrderAsync(Order order) {
String userId = order.getUserId();
executor.execute(userId, () -> {
try {
// 1. 扣减库存
inventoryService.deduct(order);
// 2. 创建订单
orderRepository.save(order);
// 3. 扣减账户余额
accountService.deduct(order.getUserId(), order.getAmount());
// 4. 发送通知
notificationService.send(order.getUserId(), "订单创建成功");
log.info("订单处理完成:{}", order.getOrderNo());
} catch (Exception e) {
log.error("订单处理失败:{}", order.getOrderNo(), e);
// 回滚或补偿逻辑
}
});
}
}
十、注意事项
1. 异常处理
executor.execute(() -> {
try {
// 任务逻辑
processTask();
} catch (Exception e) {
log.error("任务执行失败", e);
// 不要让异常终止消费线程
}
});
2. 任务超时控制
Future<?> future = executor.submit(task);
try {
future.get(10, TimeUnit.SECONDS); // 超时控制
} catch (TimeoutException e) {
future.cancel(true); // 取消任务
log.error("任务超时");
}
3. 优雅关闭
executor.shutdown(); // 不再接受新任务
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
十一、面试答题总结
简洁版回答:
让线程池顺序执行任务有以下几种方案:
1. 单线程线程池(最简单)
- 使用
corePoolSize = 1的线程池 - 任务按提交顺序串行执行
- 适合任务量小的场景
2. 分区顺序执行(推荐,兼顾性能)
- 按业务key(如用户ID)哈希分配到固定线程池
- 同一key的任务顺序执行,不同key可以并发
- 适合有分组需求的场景(如订单处理)
3. CompletableFuture链式调用(推荐,异步优雅)
- 使用
thenApplyAsync、thenRunAsync等方法串联任务 - 异步非阻塞,代码优雅
- 适合复杂的任务依赖编排
4. 自定义SerialExecutor
- 使用队列存储任务,同一时刻只执行一个
- 复用底层多线程池
- 适合需要复用现有线程池的场景
选择建议:
- 简单场景:单线程池
- 高性能场景:分区线程池
- 复杂编排:CompletableFuture
- 自定义需求:BlockingQueue + 消费者模式