问题

Stream的并行流一定比串行流更快吗?

答案

结论

并行流不一定更快,甚至在很多场景下反而更慢。是否使用并行流需要根据数据规模、计算复杂度、数据结构特性、线程池竞争等因素综合判断。

并行流原理

底层实现

并行流基于 ForkJoinPool 实现任务拆分与合并:

// 串行流
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
list.stream()
    .map(x -> x * 2)
    .collect(Collectors.toList());

// 并行流
list.parallelStream()
    .map(x -> x * 2)
    .collect(Collectors.toList());

并行执行过程

原始数据 [1,2,3,4,5,6,7,8]
    ↓ 拆分(split)
[1,2,3,4]  [5,6,7,8]
    ↓ 并行计算
[2,4,6,8]  [10,12,14,16]
    ↓ 合并(combine)
[2,4,6,8,10,12,14,16]

默认线程池

// 默认使用公共ForkJoinPool
ForkJoinPool commonPool = ForkJoinPool.commonPool();

// 并行度 = CPU核心数 - 1
int parallelism = Runtime.getRuntime().availableProcessors() - 1;

并行流性能陷阱

1. 数据规模过小

问题:任务拆分、线程调度、结果合并的开销超过计算本身。

// 数据量小时,并行流更慢
@Benchmark
public long smallDataSerial() {
    return IntStream.range(0, 100)
        .map(x -> x * 2)
        .sum();
}

@Benchmark
public long smallDataParallel() {
    return IntStream.range(0, 100)
        .parallel()
        .map(x -> x * 2)
        .sum();
}

// 结果(示例)
// smallDataSerial:    0.05 μs/op
// smallDataParallel:  2.30 μs/op  ❌ 慢46倍

原因分析

  • 线程创建与调度开销:~5-10μs
  • 任务拆分与合并开销
  • 上下文切换开销

2. 计算简单

问题:单个元素的计算耗时极短,并行开销占比过高。

// 简单计算(加法):并行流慢
LongStream.rangeClosed(1, 1_000_000)
    .parallel()
    .sum(); // ❌ 比串行慢

// 复杂计算(耗时操作):并行流快
list.parallelStream()
    .map(x -> {
        // 模拟耗时操作(如HTTP调用、复杂计算)
        Thread.sleep(10);
        return x * 2;
    })
    .collect(Collectors.toList()); // ✅ 比串行快

经验阈值:单元素计算时间需 >10微秒 才适合并行。

3. 数据结构不友好

问题:某些数据结构难以高效拆分。

数据结构 拆分性能 并行效果
ArrayList ✅ 优秀(随机访问) 适合并行
LinkedList ❌ 差(需遍历) 不适合并行
HashSet ⚠️ 一般 需测试
TreeSet ❌ 差 不适合并行
Array ✅ 优秀 适合并行
IntStream.range() ✅ 优秀 适合并行
// LinkedList并行流性能差
LinkedList<Integer> linkedList = new LinkedList<>(/* 大量数据 */);
linkedList.parallelStream() // ❌ 拆分开销大
    .map(x -> x * 2)
    .collect(Collectors.toList());

// 改为ArrayList
ArrayList<Integer> arrayList = new ArrayList<>(linkedList);
arrayList.parallelStream() // ✅ 拆分高效
    .map(x -> x * 2)
    .collect(Collectors.toList());

4. 有状态操作

问题sorted()distinct() 等有状态操作需要等待所有元素。

// 排序操作:并行流可能更慢
list.parallelStream()
    .sorted() // ❌ 需要全局排序,并行优势丧失
    .collect(Collectors.toList());

// 无状态操作:并行流效果好
list.parallelStream()
    .filter(x -> x > 100) // ✅ 每个元素独立处理
    .map(x -> x * 2)
    .collect(Collectors.toList());

5. 线程池污染

问题:所有并行流共享一个 ForkJoinPool.commonPool()

// 场景:Web请求处理中使用并行流
@GetMapping("/data")
public List<String> getData() {
    return largeList.parallelStream() // ❌ 阻塞公共线程池
        .map(this::processItem)
        .collect(Collectors.toList());
}

// 影响:其他并行流也会被阻塞

解决方案:使用自定义线程池

ForkJoinPool customPool = new ForkJoinPool(4);
try {
    return customPool.submit(() ->
        largeList.parallelStream()
            .map(this::processItem)
            .collect(Collectors.toList())
    ).get();
} catch (Exception e) {
    throw new RuntimeException(e);
}

6. 装箱拆箱开销

// 装箱开销:性能差
Stream.of(1, 2, 3, 4, 5) // ❌ Integer对象
    .parallel()
    .mapToInt(x -> x * 2)
    .sum();

// 使用原始类型流:性能好
IntStream.of(1, 2, 3, 4, 5) // ✅ 原始int
    .parallel()
    .map(x -> x * 2)
    .sum();

适合并行流的场景

✅ 场景1:大数据量 + 复杂计算

// 处理100万条数据,每条需要复杂计算
list.parallelStream()
    .map(data -> {
        // 复杂业务逻辑(如加密、解析等)
        return heavyComputation(data);
    })
    .collect(Collectors.toList());

✅ 场景2:IO密集型操作

// 批量HTTP调用
List<String> urls = Arrays.asList(/* 100个URL */);
List<String> results = urls.parallelStream()
    .map(url -> {
        return httpClient.get(url); // IO操作
    })
    .collect(Collectors.toList());

✅ 场景3:独立的数学计算

// 计算大数组的统计信息
double[] data = new double[10_000_000];
double sum = Arrays.stream(data)
    .parallel()
    .sum();

性能测试对比

// JMH基准测试示例
@State(Scope.Benchmark)
public class StreamBenchmark {
    private List<Integer> data;

    @Setup
    public void setup() {
        data = IntStream.range(0, 10_000_000)
            .boxed()
            .collect(Collectors.toList());
    }

    // 串行:简单计算
    @Benchmark
    public long serialSimple() {
        return data.stream()
            .mapToLong(x -> x * 2)
            .sum();
    }

    // 并行:简单计算
    @Benchmark
    public long parallelSimple() {
        return data.parallelStream()
            .mapToLong(x -> x * 2)
            .sum();
    }

    // 串行:复杂计算
    @Benchmark
    public long serialComplex() {
        return data.stream()
            .mapToLong(this::complexComputation)
            .sum();
    }

    // 并行:复杂计算
    @Benchmark
    public long parallelComplex() {
        return data.parallelStream()
            .mapToLong(this::complexComputation)
            .sum();
    }

    private long complexComputation(int x) {
        // 模拟耗时计算
        long result = x;
        for (int i = 0; i < 1000; i++) {
            result = (result * 31 + i) % 1000000007;
        }
        return result;
    }
}

// 典型结果(8核CPU)
// serialSimple:     15 ms/op
// parallelSimple:   12 ms/op   (提升20%)
// serialComplex:    500 ms/op
// parallelComplex:  80 ms/op   (提升525%)

最佳实践

1. 决策树

数据量 < 1000?
  ├─ 是 → 使用串行流
  └─ 否 → 计算简单(<10μs/元素)?
      ├─ 是 → 使用串行流
      └─ 否 → 数据结构可高效拆分?
          ├─ 否 → 使用串行流
          └─ 是 → 是否有状态操作?
              ├─ 是 → 考虑串行流
              └─ 否 → **使用并行流**

2. 性能测试

// 始终通过实际测试验证
long start = System.nanoTime();
// 执行操作
long duration = System.nanoTime() - start;

3. 监控线程池

// 检查ForkJoinPool状态
ForkJoinPool pool = ForkJoinPool.commonPool();
System.out.println("并行度: " + pool.getParallelism());
System.out.println("活跃线程: " + pool.getActiveThreadCount());
System.out.println("队列任务: " + pool.getQueuedTaskCount());

答题总结

并行流不一定更快,性能取决于多个因素:

  1. 数据规模:小于1000元素时通常更慢
  2. 计算复杂度:单元素计算需>10μs才有优势
  3. 数据结构:ArrayList、数组适合,LinkedList不适合
  4. 操作类型:无状态操作适合,有状态操作(sorted/distinct)不适合
  5. 线程池竞争:共享ForkJoinPool可能成为瓶颈

最佳实践:优先使用串行流,仅在大数据量+复杂计算场景下,通过性能测试验证后再使用并行流。