一、核心概念

消息丢失可能发生在三个环节

  1. 生产者发送消息时丢失
  2. Broker 存储消息时丢失
  3. 消费者消费消息时丢失

RocketMQ 通过同步发送、刷盘机制、主从复制、消费确认等手段,在各环节保障消息不丢失。

二、生产者端保证消息不丢失

1. 使用同步发送模式

// ❌ 单向发送:不关心发送结果,可能丢失
producer.sendOneway(msg);

// ⚠️ 异步发送:需要正确处理回调
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 发送成功
    }
    @Override
    public void onException(Throwable e) {
        // ⚠️ 失败时需要重试或记录日志
        log.error("消息发送失败", e);
    }
});

// ✅ 同步发送:最可靠的方式
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
    // 发送失败,进行重试或告警
}

2. 配置重试机制

// 设置同步发送失败重试次数(默认2次,共发送3次)
producer.setRetryTimesWhenSendFailed(3);

// 设置异步发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);

3. 发送失败的业务处理

public void sendMessageReliably(Message msg) {
    for (int i = 0; i < 3; i++) {
        try {
            SendResult result = producer.send(msg);
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                return; // 发送成功
            }
        } catch (Exception e) {
            log.error("消息发送失败,第{}次重试", i + 1, e);
            if (i == 2) {
                // 最后一次失败,记录到数据库或告警
                saveToDB(msg);
            }
        }
    }
}

三、Broker端保证消息不丢失

1. 刷盘策略

同步刷盘(最可靠)

# broker.conf 配置
flushDiskType=SYNC_FLUSH
  • 消息写入内存后立即刷盘到磁盘
  • 刷盘成功后才返回成功响应
  • 性能下降约10%,但保证消息持久化

异步刷盘(高性能)

flushDiskType=ASYNC_FLUSH
  • 消息写入内存后立即返回成功
  • 后台线程定期批量刷盘
  • 性能高,但机器突然宕机可能丢失少量消息

刷盘机制原理

同步刷盘流程:
1. 消息写入 PageCache(内存)
2. 调用 GroupCommitService 刷盘
3. 调用 FileChannel.force() 强制写入磁盘
4. 返回成功响应给 Producer

异步刷盘流程:
1. 消息写入 PageCache
2. 立即返回成功
3. 后台线程每500ms或累计4页(16KB)时批量刷盘

2. 主从复制策略

同步复制(最可靠)

# Master Broker 配置
brokerRole=SYNC_MASTER
  • Master 收到消息后同步写入 Slave
  • Slave 写入成功后 Master 才返回成功
  • 保证消息至少有两个副本

异步复制(高性能)

brokerRole=ASYNC_MASTER
  • Master 写入成功立即返回
  • 异步同步给 Slave
  • 性能高,但 Master 宕机可能丢失未同步消息

3. 最高可靠性配置

# broker.conf
# 同步刷盘
flushDiskType=SYNC_FLUSH

# 同步复制
brokerRole=SYNC_MASTER

# Slave Broker 配置
brokerRole=SLAVE

性能与可靠性权衡

配置组合 可靠性 性能 适用场景
同步刷盘 + 同步复制 ⭐⭐⭐⭐⭐ ⭐⭐ 金融、支付等核心业务
同步刷盘 + 异步复制 ⭐⭐⭐⭐ ⭐⭐⭐ 重要业务消息
异步刷盘 + 同步复制 ⭐⭐⭐⭐ ⭐⭐⭐ 重要业务消息
异步刷盘 + 异步复制 ⭐⭐⭐ ⭐⭐⭐⭐⭐ 日志、监控等允许少量丢失的场景

四、消费者端保证消息不丢失

1. 正确处理消费确认

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        
        for (MessageExt msg : msgs) {
            try {
                // 业务处理
                processMessage(msg);
                
                // ✅ 业务处理成功才返回 SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                
            } catch (Exception e) {
                log.error("消息消费失败: {}", msg.getMsgId(), e);
                
                // ⚠️ 业务失败返回 RECONSUME_LATER,触发重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

2. 避免自动提交陷阱

错误示例

// ❌ 异步处理消息,立即返回成功
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
    for (MessageExt msg : msgs) {
        // 异步处理消息
        threadPool.submit(() -> processMessage(msg));
    }
    // ⚠️ 此时消息可能还未处理完,就已返回成功
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

正确做法

// ✅ 同步处理完成后再返回
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
    try {
        for (MessageExt msg : msgs) {
            processMessage(msg); // 同步处理
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3. 消费重试机制

// 设置最大重试次数
consumer.setMaxReconsumeTimes(16); // 默认16次

// 重试时间间隔(由Broker控制,逐步延长)
// 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

4. 处理死信队列

// 消息重试16次后进入死信队列(DLQ)
// 死信队列命名规则:%DLQ% + ConsumerGroup
String dlqTopic = "%DLQ%" + consumerGroup;

// 需要单独监听死信队列,处理失败消息
consumer.subscribe(dlqTopic, "*");

五、分布式场景的最佳实践

1. 完整的可靠性方案

/**
 * 生产者:带重试的可靠发送
 */
public class ReliableProducer {
    
    public void sendReliably(String topic, String body) {
        Message msg = new Message(topic, body.getBytes());
        
        try {
            // 同步发送
            SendResult result = producer.send(msg);
            
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                log.info("消息发送成功: {}", result.getMsgId());
            } else {
                // 发送失败,记录到DB用于补偿
                saveFailedMessage(msg);
            }
            
        } catch (Exception e) {
            // 异常时记录到DB
            saveFailedMessage(msg);
            log.error("消息发送异常", e);
        }
    }
}

/**
 * 消费者:幂等消费
 */
public class ReliableConsumer {
    
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ...) {
        for (MessageExt msg : msgs) {
            String msgId = msg.getMsgId();
            
            // 1. 幂等性检查(防止重复消费)
            if (isConsumed(msgId)) {
                continue;
            }
            
            try {
                // 2. 业务处理
                processMessage(msg);
                
                // 3. 标记已消费
                markConsumed(msgId);
                
            } catch (Exception e) {
                log.error("消息消费失败: {}", msgId, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

2. 消息轨迹追踪

// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer(
    "ProducerGroup",
    true,  // 开启消息轨迹
    null   // 自定义轨迹Topic,默认 RMQ_SYS_TRACE_TOPIC
);

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
    "ConsumerGroup",
    true,  // 开启消息轨迹
    null
);

六、面试总结

三个环节的保障措施

环节 可能丢失的原因 保障措施
生产者 网络故障、Broker宕机 同步发送 + 重试机制 + 失败补偿
Broker 机器宕机、磁盘故障 同步刷盘 + 主从同步复制
消费者 消费异常、业务失败 手动ACK + 消费重试 + 幂等性保证

关键配置

# 最高可靠性配置(牺牲性能)
flushDiskType=SYNC_FLUSH          # 同步刷盘
brokerRole=SYNC_MASTER            # 同步复制

# 生产者
retryTimesWhenSendFailed=3        # 发送失败重试3次

# 消费者
maxReconsumeTimes=16              # 消费失败重试16次

答题要点

  1. 三个环节分别说明:生产者、Broker、消费者
  2. Broker 两个关键:同步刷盘 + 主从同步复制
  3. 消费者幂等性:避免重复消费导致业务异常
  4. 死信队列处理:超过最大重试次数的消息
  5. 性能权衡:金融场景用同步,日志场景用异步

延伸问题

  • RocketMQ 的同步刷盘是如何实现的?
  • 消息重复消费如何解决?(幂等性设计)
  • 如何实现消息的最终一致性?