一、问题分析

线程池的设计初衷是并发执行任务以提高性能,但某些业务场景需要保证任务的顺序执行

  • 数据库事务操作有先后依赖
  • 状态机流转必须按顺序
  • 日志记录需要保持时序
  • 消息处理需要保证顺序性

二、方案一:单线程线程池(最简单)

1. 使用 newSingleThreadExecutor

ExecutorService executor = Executors.newSingleThreadExecutor();

executor.submit(() -> System.out.println("任务1"));
executor.submit(() -> System.out.println("任务2"));
executor.submit(() -> System.out.println("任务3"));

// 输出保证是:任务1 -> 任务2 -> 任务3

原理

  • 只有一个工作线程
  • 任务放入队列后按提交顺序执行
  • 保证任务串行执行

2. 手动创建单线程池(推荐)

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    1,  // corePoolSize = 1
    1,  // maximumPoolSize = 1
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(1000),  // 有界队列
    new ThreadFactoryBuilder()
        .setNameFormat("sequential-task-%d")
        .build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 任务提交
executor.execute(() -> process("任务1"));
executor.execute(() -> process("任务2"));
executor.execute(() -> process("任务3"));

优点

  • 实现简单,逻辑清晰
  • 天然保证顺序性
  • 适合任务量不大的场景

缺点

  • 无法利用多核优势
  • 吞吐量低
  • 一个任务阻塞会影响后续所有任务

三、方案二:按业务维度分组顺序执行

对于需要部分顺序、部分并发的场景(如订单处理:同一用户的订单需要顺序,不同用户可以并发)。

实现方式:哈希分片 + 多个单线程池

public class PartitionedSequentialExecutor {
    private final ThreadPoolExecutor[] executors;
    private final int partitionCount;
    
    public PartitionedSequentialExecutor(int partitionCount) {
        this.partitionCount = partitionCount;
        this.executors = new ThreadPoolExecutor[partitionCount];
        
        for (int i = 0; i < partitionCount; i++) {
            executors[i] = new ThreadPoolExecutor(
                1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(500),
                new ThreadFactoryBuilder()
                    .setNameFormat("partition-" + i + "-%d")
                    .build(),
                new ThreadPoolExecutor.CallerRunsPolicy()
            );
        }
    }
    
    /**
     * 根据key分配到固定的线程池执行
     * 同一个key的任务保证顺序执行
     */
    public void execute(String key, Runnable task) {
        int partition = Math.abs(key.hashCode()) % partitionCount;
        executors[partition].execute(task);
    }
    
    public void shutdown() {
        for (ThreadPoolExecutor executor : executors) {
            executor.shutdown();
        }
    }
}

// 使用示例
PartitionedSequentialExecutor executor = new PartitionedSequentialExecutor(8);

// 同一用户的订单保证顺序
executor.execute("user123", () -> processOrder("订单1"));
executor.execute("user123", () -> processOrder("订单2"));
executor.execute("user123", () -> processOrder("订单3"));

// 不同用户的订单可以并发
executor.execute("user456", () -> processOrder("订单A"));
executor.execute("user789", () -> processOrder("订单B"));

适用场景

  • 同一用户的操作需要顺序(如订单、账户操作)
  • 消息队列的顺序消费(按分区消费)
  • 数据库分片场景的事务处理

优点

  • 兼顾顺序性和并发性
  • 充分利用多核CPU
  • 性能优于单线程池

四、方案三:使用 Future 串联任务

通过 Future.get() 阻塞等待前一个任务完成。

ExecutorService executor = Executors.newFixedThreadPool(10);

Future<?> future1 = executor.submit(() -> {
    System.out.println("执行任务1");
    Thread.sleep(1000);
    return "结果1";
});

// 等待任务1完成
future1.get();

Future<?> future2 = executor.submit(() -> {
    System.out.println("执行任务2,使用前一个任务的结果");
    return "结果2";
});

future2.get();

Future<?> future3 = executor.submit(() -> {
    System.out.println("执行任务3");
    return "结果3";
});

future3.get();

缺点

  • 阻塞主线程,没有充分利用异步优势
  • 代码不优雅

五、方案四:使用 CompletableFuture 链式调用

利用 CompletableFuturethenComposethenApply 等方法实现任务编排。

1. 基本串行执行

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1");
    return "结果1";
}, executor)
.thenApplyAsync(result -> {
    System.out.println("任务2,使用:" + result);
    return "结果2";
}, executor)
.thenApplyAsync(result -> {
    System.out.println("任务3,使用:" + result);
    return "结果3";
}, executor)
.thenAccept(result -> {
    System.out.println("最终结果:" + result);
});

// 不阻塞主线程
Thread.sleep(5000);

2. 复杂依赖编排

// 任务A
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行任务A");
    return "A结果";
}, executor);

// 任务B,依赖任务A
CompletableFuture<String> taskB = taskA.thenApplyAsync(aResult -> {
    System.out.println("执行任务B,使用:" + aResult);
    return "B结果";
}, executor);

// 任务C,依赖任务B
CompletableFuture<String> taskC = taskB.thenApplyAsync(bResult -> {
    System.out.println("执行任务C,使用:" + bResult);
    return "C结果";
}, executor);

// 等待最终结果
String finalResult = taskC.get();

3. 有序批量处理

public class SequentialProcessor {
    
    private final ExecutorService executor;
    
    public SequentialProcessor(ExecutorService executor) {
        this.executor = executor;
    }
    
    /**
     * 顺序执行一批任务
     */
    public CompletableFuture<Void> executeSequentially(List<Runnable> tasks) {
        CompletableFuture<Void> chain = CompletableFuture.completedFuture(null);
        
        for (Runnable task : tasks) {
            chain = chain.thenRunAsync(task, executor);
        }
        
        return chain;
    }
}

// 使用示例
List<Runnable> tasks = Arrays.asList(
    () -> System.out.println("任务1"),
    () -> System.out.println("任务2"),
    () -> System.out.println("任务3")
);

SequentialProcessor processor = new SequentialProcessor(executor);
processor.executeSequentially(tasks).join();  // 等待全部完成

优点

  • 异步非阻塞,不阻塞主线程
  • 代码优雅,链式调用
  • 支持复杂的任务依赖关系
  • 异常处理机制完善(exceptionallyhandle

六、方案五:使用 BlockingQueue 手动控制

自己实现一个顺序消费者。

public class SequentialTaskExecutor {
    
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private volatile boolean running = true;
    
    public SequentialTaskExecutor() {
        // 启动消费线程
        executor.execute(() -> {
            while (running || !taskQueue.isEmpty()) {
                try {
                    Runnable task = taskQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (task != null) {
                        task.run();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
    
    public void submit(Runnable task) {
        taskQueue.offer(task);
    }
    
    public void shutdown() {
        running = false;
        executor.shutdown();
    }
}

// 使用示例
SequentialTaskExecutor executor = new SequentialTaskExecutor();

executor.submit(() -> System.out.println("任务1"));
executor.submit(() -> System.out.println("任务2"));
executor.submit(() -> System.out.println("任务3"));

Thread.sleep(2000);
executor.shutdown();

七、方案六:使用 Guava 的 SerialExecutor

Google Guava库提供了一个内部类 SerialExecutor

// Guava的实现思路
public class SerialExecutor implements Executor {
    private final Queue<Runnable> tasks = new ArrayDeque<>();
    private final Executor executor;
    private Runnable active;
    
    public SerialExecutor(Executor executor) {
        this.executor = executor;
    }
    
    @Override
    public synchronized void execute(Runnable r) {
        tasks.add(() -> {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        });
        
        if (active == null) {
            scheduleNext();
        }
    }
    
    private synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            executor.execute(active);
        }
    }
}

// 使用示例
Executor threadPool = Executors.newFixedThreadPool(10);
SerialExecutor serialExecutor = new SerialExecutor(threadPool);

serialExecutor.execute(() -> System.out.println("任务1"));
serialExecutor.execute(() -> System.out.println("任务2"));
serialExecutor.execute(() -> System.out.println("任务3"));

原理

  • 任务提交到队列
  • 同一时刻只有一个任务在底层线程池中执行
  • 任务完成后自动调度下一个

八、方案对比

方案 顺序性 性能 复杂度 适用场景
单线程池 ✅ 完全顺序 ⭐ 简单 任务量小,简单场景
分区线程池 ✅ 分组顺序 ⭐⭐⭐⭐ ⭐⭐⭐ 中等 按业务分组顺序(如用户维度)
Future串联 ✅ 完全顺序 ⭐⭐ ⭐⭐ 简单 阻塞式顺序,不推荐
CompletableFuture ✅ 完全顺序 ⭐⭐⭐⭐ ⭐⭐⭐ 中等 异步编排,推荐
BlockingQueue ✅ 完全顺序 ⭐⭐⭐ ⭐⭐⭐⭐ 复杂 自定义控制需求
SerialExecutor ✅ 完全顺序 ⭐⭐⭐ ⭐⭐⭐ 中等 复用现有线程池

九、实战案例:订单顺序处理

@Service
public class OrderProcessService {
    
    // 按用户ID分区,保证同一用户的订单顺序处理
    private final PartitionedSequentialExecutor executor 
        = new PartitionedSequentialExecutor(16);
    
    /**
     * 处理订单(异步)
     */
    public void processOrderAsync(Order order) {
        String userId = order.getUserId();
        
        executor.execute(userId, () -> {
            try {
                // 1. 扣减库存
                inventoryService.deduct(order);
                
                // 2. 创建订单
                orderRepository.save(order);
                
                // 3. 扣减账户余额
                accountService.deduct(order.getUserId(), order.getAmount());
                
                // 4. 发送通知
                notificationService.send(order.getUserId(), "订单创建成功");
                
                log.info("订单处理完成:{}", order.getOrderNo());
            } catch (Exception e) {
                log.error("订单处理失败:{}", order.getOrderNo(), e);
                // 回滚或补偿逻辑
            }
        });
    }
}

十、注意事项

1. 异常处理

executor.execute(() -> {
    try {
        // 任务逻辑
        processTask();
    } catch (Exception e) {
        log.error("任务执行失败", e);
        // 不要让异常终止消费线程
    }
});

2. 任务超时控制

Future<?> future = executor.submit(task);

try {
    future.get(10, TimeUnit.SECONDS);  // 超时控制
} catch (TimeoutException e) {
    future.cancel(true);  // 取消任务
    log.error("任务超时");
}

3. 优雅关闭

executor.shutdown();  // 不再接受新任务

try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();  // 强制关闭
    }
} catch (InterruptedException e) {
    executor.shutdownNow();
}

十一、面试答题总结

简洁版回答

让线程池顺序执行任务有以下几种方案:

1. 单线程线程池(最简单)

  • 使用 corePoolSize = 1 的线程池
  • 任务按提交顺序串行执行
  • 适合任务量小的场景

2. 分区顺序执行(推荐,兼顾性能)

  • 按业务key(如用户ID)哈希分配到固定线程池
  • 同一key的任务顺序执行,不同key可以并发
  • 适合有分组需求的场景(如订单处理)

3. CompletableFuture链式调用(推荐,异步优雅)

  • 使用 thenApplyAsyncthenRunAsync 等方法串联任务
  • 异步非阻塞,代码优雅
  • 适合复杂的任务依赖编排

4. 自定义SerialExecutor

  • 使用队列存储任务,同一时刻只执行一个
  • 复用底层多线程池
  • 适合需要复用现有线程池的场景

选择建议

  • 简单场景:单线程池
  • 高性能场景:分区线程池
  • 复杂编排:CompletableFuture
  • 自定义需求:BlockingQueue + 消费者模式