优惠券下发从30小时优化到20分钟,如何做到?

业务场景

电商大促前夕,需要给1000万用户下发优惠券。

原始方案:单线程遍历用户列表,逐个下发。 问题:预估需要30+小时,无法在活动开始前完成!


性能分析

原始代码

public void sendCoupons(Long couponId) {
    List<Long> userIds = userMapper.findAllEligibleUsers();
    
    for (Long userId : userIds) {
        // 单条处理,每条约10ms
        couponService.send(userId, couponId);
    }
}

耗时分析

1000万用户 × 10ms/用户 = 1亿ms = 约28小时

优化策略

策略一:批量处理

// 原始:逐条插入
for (Long userId : userIds) {
    couponMapper.insert(new Coupon(userId, couponId));
}

// 优化:批量插入
List<List<Long>> batches = Lists.partition(userIds, 1000);
for (List<Long> batch : batches) {
    List<Coupon> coupons = batch.stream()
        .map(uid -> new Coupon(uid, couponId))
        .collect(Collectors.toList());
    couponMapper.batchInsert(coupons);
}

效果:1000条批量插入比逐条快10-20倍。

策略二:多线程并行

@Async
public void sendCouponsAsync(Long couponId) {
    List<Long> userIds = userMapper.findAllEligibleUsers();
    
    // 分成100个批次
    List<List<Long>> batches = Lists.partition(userIds, 100000);
    
    // 并行处理
    List<CompletableFuture<Void>> futures = batches.stream()
        .map(batch -> CompletableFuture.runAsync(() -> 
            processBatch(batch, couponId), executor))
        .collect(Collectors.toList());
    
    // 等待全部完成
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

private void processBatch(List<Long> userIds, Long couponId) {
    List<List<Long>> subBatches = Lists.partition(userIds, 1000);
    for (List<Long> sub : subBatches) {
        couponMapper.batchInsert(buildCoupons(sub, couponId));
    }
}

效果:50线程并行,理论提升50倍。

策略三:分库分表并行

如果用户表已分库分表,可以按分片并行处理。

public void sendCouponsBySharding(Long couponId) {
    // 获取所有分片
    int shardCount = 32;
    
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    
    for (int i = 0; i < shardCount; i++) {
        int shardIndex = i;
        futures.add(CompletableFuture.runAsync(() -> {
            // 每个分片独立处理
            processShardUsers(shardIndex, couponId);
        }, executor));
    }
    
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

private void processShardUsers(int shardIndex, Long couponId) {
    // 直接查询对应分片
    List<Long> userIds = userMapper.findByShardIndex(shardIndex);
    // 批量处理
    batchSend(userIds, couponId);
}

策略四:异步消息 + 消费者扩容

// 生产者:快速发送消息
public void sendCouponsViaMQ(Long couponId) {
    List<Long> userIds = userMapper.findAllEligibleUsers();
    
    List<List<Long>> batches = Lists.partition(userIds, 100);
    for (List<Long> batch : batches) {
        CouponMessage msg = new CouponMessage(batch, couponId);
        kafkaTemplate.send("coupon-send", JSON.toJSONString(msg));
    }
}

// 消费者:可水平扩容
@KafkaListener(topics = "coupon-send", concurrency = "20")
public void consume(String message) {
    CouponMessage msg = JSON.parseObject(message, CouponMessage.class);
    couponMapper.batchInsert(buildCoupons(msg.getUserIds(), msg.getCouponId()));
}

优势:消费者可以水平扩容,增加机器即可提速。


数据库优化

减少索引

-- 临时移除非必要索引
ALTER TABLE user_coupon DROP INDEX idx_expire_time;

-- 批量导入完成后重建
ALTER TABLE user_coupon ADD INDEX idx_expire_time (expire_time);

使用LOAD DATA

-- 比INSERT快10-20倍
LOAD DATA LOCAL INFILE '/tmp/coupons.csv'
INTO TABLE user_coupon
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

Java生成CSV

public void exportToCsv(List<Long> userIds, Long couponId) {
    try (FileWriter writer = new FileWriter("/tmp/coupons.csv")) {
        for (Long userId : userIds) {
            writer.write(userId + "," + couponId + "," + expireTime + "\n");
        }
    }
    // 再执行LOAD DATA
}

最终方案

1000万用户
    │
    ├── 分成100个批次(每批10万)
    │
    ├── 50线程并行处理
    │       │
    │       ├── 每批再分1000条子批次
    │       │
    │       └── 批量INSERT
    │
    └── 总计约15-20分钟完成

优化效果

优化项 耗时 提升
原始单线程 28小时 -
+批量插入 3小时 9倍
+多线程(50) 4分钟 420倍
+分库并行 更快 -

注意事项

1. 限流保护

// 控制并发,保护数据库
Semaphore semaphore = new Semaphore(10);

executor.submit(() -> {
    semaphore.acquire();
    try {
        processBatch(batch, couponId);
    } finally {
        semaphore.release();
    }
});

2. 进度监控

AtomicLong processed = new AtomicLong(0);

// 处理完一批后上报
processed.addAndGet(batch.size());
log.info("已处理: {}/{}", processed.get(), total);

3. 断点续传

// 记录处理进度,支持中断恢复
@Transactional
public void processBatch(List<Long> userIds, Long couponId, Long batchId) {
    couponMapper.batchInsert(buildCoupons(userIds, couponId));
    progressMapper.update(batchId, "DONE");
}

面试答题框架

原始问题:1000万用户逐条处理,预估28小时

优化思路:
1. 批量处理:提升10倍
2. 多线程:提升50倍
3. 分库并行:充分利用分片
4. MQ异步:支持水平扩容

最终效果:28小时 → 15-20分钟

关键细节:
- 批量大小:1000条/批
- 线程数:根据DB承受能力调整
- 限流保护:Semaphore控制并发
- 进度监控:支持断点续传

总结

策略 适用场景 提升幅度
批量INSERT 通用 10-20倍
多线程 CPU/IO密集 N倍
分库并行 已分库 分片数倍
MQ消费者扩容 需弹性 水平扩展