1. 核心概念
消息积压(Message Backlog) 指的是在消息队列(MQ)中,生产者发送消息的速度持续超过消费者处理消息的速度,导致大量消息滞留在队列中无法被及时消费。这种情况通常发生在流量突峰、消费者故障或处理逻辑变慢时。
处理积压的核心思路是:“降级止损” 和 “提升消费能力”。
2. 原理与解决方案
解决消息积压通常分为两个阶段:紧急处理 和 根本解决。
2.1 紧急处理(短期方案)
当线上发生严重积压时,首要目标是尽快恢复业务,而不是立即排查代码细节。
- 临时扩容(Consumer 扩容)
- 如果当前 Topic 的分区(Partition/Queue)数量 > 消费者数量,直接增加消费者实例即可。
- 关键点:如果消费者数量已经等于分区数量,单纯增加消费者无效(因为一个分区通常只能被一个消费者消费)。
- 操作步骤:
- 新建一个临时的 Topic,Partition 数量扩容为原来的 N 倍(例如 10 倍)。
- 编写一个临时的转发程序(Consumer),只负责从原 Topic 拉取消息并快速写入到临时 Topic 中(不做任何业务逻辑,速度极快)。
- 部署 N 倍数量的消费者订阅临时 Topic,进行真正的业务消费。
- 积压消费完毕后,恢复原架构。
- 服务降级
- 关闭非核心业务的消费,腾出资源给核心业务。
- 如果业务允许部分数据丢失(如日志收集),可暂时开启“丢弃模式”,直接丢弃旧消息或低优先级消息。
2.2 根本解决(长期方案)
- 优化消费端逻辑
- 批量消费:调整参数(如 Kafka 的
max.poll.records),一次拉取多条数据批量处理,减少网络交互和 IO 开销。 - 异步并发:在消费者内部使用线程池,将拉取到的消息分发给 Worker 线程处理,实现“单 Consumer 多线程”模型。
- 排查慢 SQL/外部调用:积压往往是因为下游数据库慢或第三方接口超时,需重点排查并优化。
- 批量消费:调整参数(如 Kafka 的
- 生产端限流
- 在发送端进行限流,避免瞬时流量压垮消费端,虽然这可能导致前端请求失败,但保护了后端系统。
3. 考量点
- 消息顺序性:使用临时扩容方案(转发到新 Topic)时,很难保证全局顺序性。如果业务强依赖顺序,只能在原 Consumer 内部优化(如按 ID Hash 到不同内存队列)。
- 幂等性:积压处理过程中可能会涉及重试或重复消费,必须保证消费逻辑的幂等性。
- 监控告警:预防胜于治疗。应建立完善的监控体系,对
Lag(积压量)设置阈值告警,在积压初期就介入处理。
4. 示例与总结
回答总结: “处理消息积压主要分两步走。 第一,紧急应对。如果积压严重影响业务,我会采用‘临时扩容法’:新建一个 Partition 数更多的临时 Topic,写一个只负责转发不处理业务的临时 Consumer,把积压消息搬运到新 Topic,然后用大量新 Consumer 并行消费。 第二,根源治理。事后排查原因,通常是消费端慢。我会优化消费逻辑,比如改用批量消费、引入本地线程池并发处理、优化慢 SQL 或下游接口调用。同时,加强监控,设置积压告警,防止下次复发。”
// 伪代码:消费者内部使用线程池并发处理(提升单机吞吐)
public class ConcurrentConsumer {
private ExecutorService executor = Executors.newFixedThreadPool(10);
public void consume() {
while (true) {
List<Message> messages = consumer.poll();
for (Message msg : messages) {
// 提交到线程池异步处理,而不是在主线程阻塞
executor.submit(() -> processBusinessLogic(msg));
}
}
}
}