一、核心概念
消息丢失可能发生在三个环节:
- 生产者发送消息时丢失
- Broker 存储消息时丢失
- 消费者消费消息时丢失
RocketMQ 通过同步发送、刷盘机制、主从复制、消费确认等手段,在各环节保障消息不丢失。
二、生产者端保证消息不丢失
1. 使用同步发送模式
// ❌ 单向发送:不关心发送结果,可能丢失
producer.sendOneway(msg);
// ⚠️ 异步发送:需要正确处理回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功
}
@Override
public void onException(Throwable e) {
// ⚠️ 失败时需要重试或记录日志
log.error("消息发送失败", e);
}
});
// ✅ 同步发送:最可靠的方式
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 发送失败,进行重试或告警
}
2. 配置重试机制
// 设置同步发送失败重试次数(默认2次,共发送3次)
producer.setRetryTimesWhenSendFailed(3);
// 设置异步发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
3. 发送失败的业务处理
public void sendMessageReliably(Message msg) {
for (int i = 0; i < 3; i++) {
try {
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return; // 发送成功
}
} catch (Exception e) {
log.error("消息发送失败,第{}次重试", i + 1, e);
if (i == 2) {
// 最后一次失败,记录到数据库或告警
saveToDB(msg);
}
}
}
}
三、Broker端保证消息不丢失
1. 刷盘策略
同步刷盘(最可靠):
# broker.conf 配置
flushDiskType=SYNC_FLUSH
- 消息写入内存后立即刷盘到磁盘
- 刷盘成功后才返回成功响应
- 性能下降约10%,但保证消息持久化
异步刷盘(高性能):
flushDiskType=ASYNC_FLUSH
- 消息写入内存后立即返回成功
- 后台线程定期批量刷盘
- 性能高,但机器突然宕机可能丢失少量消息
刷盘机制原理:
同步刷盘流程:
1. 消息写入 PageCache(内存)
2. 调用 GroupCommitService 刷盘
3. 调用 FileChannel.force() 强制写入磁盘
4. 返回成功响应给 Producer
异步刷盘流程:
1. 消息写入 PageCache
2. 立即返回成功
3. 后台线程每500ms或累计4页(16KB)时批量刷盘
2. 主从复制策略
同步复制(最可靠):
# Master Broker 配置
brokerRole=SYNC_MASTER
- Master 收到消息后同步写入 Slave
- Slave 写入成功后 Master 才返回成功
- 保证消息至少有两个副本
异步复制(高性能):
brokerRole=ASYNC_MASTER
- Master 写入成功立即返回
- 异步同步给 Slave
- 性能高,但 Master 宕机可能丢失未同步消息
3. 最高可靠性配置
# broker.conf
# 同步刷盘
flushDiskType=SYNC_FLUSH
# 同步复制
brokerRole=SYNC_MASTER
# Slave Broker 配置
brokerRole=SLAVE
性能与可靠性权衡:
| 配置组合 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|
| 同步刷盘 + 同步复制 | ⭐⭐⭐⭐⭐ | ⭐⭐ | 金融、支付等核心业务 |
| 同步刷盘 + 异步复制 | ⭐⭐⭐⭐ | ⭐⭐⭐ | 重要业务消息 |
| 异步刷盘 + 同步复制 | ⭐⭐⭐⭐ | ⭐⭐⭐ | 重要业务消息 |
| 异步刷盘 + 异步复制 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 日志、监控等允许少量丢失的场景 |
四、消费者端保证消息不丢失
1. 正确处理消费确认
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 业务处理
processMessage(msg);
// ✅ 业务处理成功才返回 SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消息消费失败: {}", msg.getMsgId(), e);
// ⚠️ 业务失败返回 RECONSUME_LATER,触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
2. 避免自动提交陷阱
错误示例:
// ❌ 异步处理消息,立即返回成功
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
for (MessageExt msg : msgs) {
// 异步处理消息
threadPool.submit(() -> processMessage(msg));
}
// ⚠️ 此时消息可能还未处理完,就已返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
正确做法:
// ✅ 同步处理完成后再返回
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
try {
for (MessageExt msg : msgs) {
processMessage(msg); // 同步处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3. 消费重试机制
// 设置最大重试次数
consumer.setMaxReconsumeTimes(16); // 默认16次
// 重试时间间隔(由Broker控制,逐步延长)
// 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
4. 处理死信队列
// 消息重试16次后进入死信队列(DLQ)
// 死信队列命名规则:%DLQ% + ConsumerGroup
String dlqTopic = "%DLQ%" + consumerGroup;
// 需要单独监听死信队列,处理失败消息
consumer.subscribe(dlqTopic, "*");
五、分布式场景的最佳实践
1. 完整的可靠性方案
/**
* 生产者:带重试的可靠发送
*/
public class ReliableProducer {
public void sendReliably(String topic, String body) {
Message msg = new Message(topic, body.getBytes());
try {
// 同步发送
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
log.info("消息发送成功: {}", result.getMsgId());
} else {
// 发送失败,记录到DB用于补偿
saveFailedMessage(msg);
}
} catch (Exception e) {
// 异常时记录到DB
saveFailedMessage(msg);
log.error("消息发送异常", e);
}
}
}
/**
* 消费者:幂等消费
*/
public class ReliableConsumer {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 1. 幂等性检查(防止重复消费)
if (isConsumed(msgId)) {
continue;
}
try {
// 2. 业务处理
processMessage(msg);
// 3. 标记已消费
markConsumed(msgId);
} catch (Exception e) {
log.error("消息消费失败: {}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
2. 消息轨迹追踪
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer(
"ProducerGroup",
true, // 开启消息轨迹
null // 自定义轨迹Topic,默认 RMQ_SYS_TRACE_TOPIC
);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroup",
true, // 开启消息轨迹
null
);
六、面试总结
三个环节的保障措施
| 环节 | 可能丢失的原因 | 保障措施 |
|---|---|---|
| 生产者 | 网络故障、Broker宕机 | 同步发送 + 重试机制 + 失败补偿 |
| Broker | 机器宕机、磁盘故障 | 同步刷盘 + 主从同步复制 |
| 消费者 | 消费异常、业务失败 | 手动ACK + 消费重试 + 幂等性保证 |
关键配置
# 最高可靠性配置(牺牲性能)
flushDiskType=SYNC_FLUSH # 同步刷盘
brokerRole=SYNC_MASTER # 同步复制
# 生产者
retryTimesWhenSendFailed=3 # 发送失败重试3次
# 消费者
maxReconsumeTimes=16 # 消费失败重试16次
答题要点
- 三个环节分别说明:生产者、Broker、消费者
- Broker 两个关键:同步刷盘 + 主从同步复制
- 消费者幂等性:避免重复消费导致业务异常
- 死信队列处理:超过最大重试次数的消息
- 性能权衡:金融场景用同步,日志场景用异步
延伸问题:
- RocketMQ 的同步刷盘是如何实现的?
- 消息重复消费如何解决?(幂等性设计)
- 如何实现消息的最终一致性?