核心概念

消息可靠性指的是确保消息不丢失、不重复、不乱序地从生产者传递到消费者。RabbitMQ 通过三个阶段的可靠性保障机制来实现:

生产者 → RabbitMQ → 消费者
   ①          ②         ③
  1. 生产者到 Broker:事务机制 / Confirm 模式
  2. Broker 内部持久化:Exchange、Queue、Message 持久化
  3. Broker 到消费者:ACK 确认机制

一、生产者到 Broker 的可靠性

1. 事务机制(不推荐)

通过 AMQP 事务确保消息发送成功,但性能极差(吞吐量下降 250 倍)。

try {
    // 开启事务
    channel.txSelect();
    
    // 发送消息
    channel.basicPublish(exchange, routingKey, null, message.getBytes());
    
    // 提交事务
    channel.txCommit();
} catch (Exception e) {
    // 回滚事务
    channel.txRollback();
}

缺点

  • ❌ 同步阻塞,性能极低
  • ❌ 与 Confirm 模式互斥

2. Publisher Confirm 模式(推荐)

生产者发送消息后,Broker 会异步返回确认(ACK/NACK),性能远高于事务。

2.1 同步单条确认

// 开启 Confirm 模式
channel.confirmSelect();

// 发送消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());

// 同步等待确认(阻塞)
if (channel.waitForConfirms()) {
    System.out.println("消息发送成功");
} else {
    System.out.println("消息发送失败");
}

特点

  • ✅ 可靠性高
  • ❌ 同步等待,性能一般

2.2 批量确认

channel.confirmSelect();

int batchSize = 100;
int count = 0;

for (String message : messages) {
    channel.basicPublish(exchange, routingKey, null, message.getBytes());
    count++;
    
    if (count % batchSize == 0) {
        channel.waitForConfirms(); // 批量确认
    }
}

特点

  • ✅ 性能提升
  • ❌ 失败后不知道是哪条消息出错

2.3 异步 Confirm 模式(最佳实践)

channel.confirmSelect();

// 添加确认监听器
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        // 消息确认成功
        System.out.println("消息 " + deliveryTag + " 发送成功");
        // 从缓存中删除已确认的消息
        if (multiple) {
            // 批量确认:deliveryTag 之前的所有消息
            confirmCache.headMap(deliveryTag, true).clear();
        } else {
            confirmCache.remove(deliveryTag);
        }
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        // 消息确认失败
        System.err.println("消息 " + deliveryTag + " 发送失败");
        // 重新发送失败的消息
        String message = confirmCache.get(deliveryTag);
        channel.basicPublish(exchange, routingKey, null, message.getBytes());
    }
});

// 发送消息并记录到缓存
long deliveryTag = channel.getNextPublishSeqNo();
confirmCache.put(deliveryTag, message);
channel.basicPublish(exchange, routingKey, null, message.getBytes());

优点

  • ✅ 异步非阻塞,性能最高
  • ✅ 可精确定位失败的消息
  • ✅ 生产环境推荐方案

3. Mandatory 参数 + ReturnListener

解决消息无法路由到队列的问题。

// 设置 mandatory=true,无法路由的消息会返回给生产者
channel.basicPublish(exchange, routingKey, 
    true,  // mandatory
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    message.getBytes()
);

// 监听无法路由的消息
channel.addReturnListener((replyCode, replyText, exchange, routingKey, 
                           properties, body) -> {
    System.err.println("消息无法路由: " + new String(body));
    // 记录日志或重新发送
});

场景:Exchange 存在但没有匹配的队列绑定。


二、Broker 内部持久化

1. Exchange 持久化

// durable=true,Exchange 元数据持久化到磁盘
channel.exchangeDeclare("my_exchange", BuiltinExchangeType.DIRECT, true);

2. Queue 持久化

// durable=true,Queue 元数据持久化
channel.queueDeclare("my_queue", 
    true,   // durable:持久化
    false,  // exclusive:排他
    false,  // autoDelete:自动删除
    null    // arguments
);

3. Message 持久化

// 设置消息为持久化(deliveryMode=2)
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(exchange, routingKey, props, message.getBytes());

持久化原理

  • 消息先写入内存,异步刷盘(批量写入)
  • RabbitMQ 重启后,从磁盘恢复持久化的 Exchange、Queue、Message

注意

  • ⚠️ 持久化会降低性能(约 10 倍)
  • ⚠️ 即使持久化,也存在消息还在内存时 Broker 宕机的风险
  • ✅ 配合 Publisher Confirm 使用可确保消息已刷盘

三、Broker 到消费者的可靠性

1. 手动 ACK 模式(推荐)

// autoAck=false,手动确认模式
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        long deliveryTag = envelope.getDeliveryTag();
        try {
            // 业务处理
            processMessage(new String(body));
            
            // 手动确认(单条)
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 处理失败,拒绝消息
            channel.basicNack(deliveryTag, 
                false,  // multiple:是否批量
                true    // requeue:是否重新入队
            );
        }
    }
});

ACK 模式对比

模式 说明 优点 缺点
自动 ACK 消息发送给消费者后立即确认 性能高 消费者宕机会丢消息
手动 ACK 消费者处理完成后手动确认 可靠性高 需要处理各种异常情况

2. ACK/NACK/Reject 区别

// 1. basicAck:确认消息,从队列删除
channel.basicAck(deliveryTag, false);

// 2. basicNack:拒绝消息,可选择是否重新入队
channel.basicNack(deliveryTag, false, true);  // requeue=true 重新入队

// 3. basicReject:拒绝单条消息(不支持批量)
channel.basicReject(deliveryTag, false);  // requeue=false 丢弃消息

重新入队的风险

  • ⚠️ 如果消息本身有问题,会导致死循环
  • ✅ 建议:失败 N 次后发送到死信队列

3. 预取数量(QoS)

// 限制消费者同时处理的未确认消息数量
channel.basicQos(10);  // prefetchCount=10,一次最多拉取 10 条

作用

  • 防止消费者堆积过多未确认消息导致 OOM
  • 实现负载均衡(能力强的消费者处理更多消息)

四、消息可靠性完整方案

最佳实践代码

// ========== 生产者侧 ==========
// 1. 开启 Confirm 模式
channel.confirmSelect();

// 2. 添加异步确认监听器
ConcurrentSkipListMap<Long, String> confirmCache = new ConcurrentSkipListMap<>();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        if (multiple) {
            confirmCache.headMap(deliveryTag, true).clear();
        } else {
            confirmCache.remove(deliveryTag);
        }
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        String msg = confirmCache.get(deliveryTag);
        // 重新发送或记录日志
        resendMessage(msg);
    }
});

// 3. 添加 Return 监听器(处理无法路由的消息)
channel.addReturnListener((replyCode, replyText, exchange, routingKey, 
                           properties, body) -> {
    log.error("消息无法路由: {}", new String(body));
    // 发送到备用队列或报警
});

// 4. 发送持久化消息
long deliveryTag = channel.getNextPublishSeqNo();
confirmCache.put(deliveryTag, message);
channel.basicPublish(
    exchange, 
    routingKey,
    true,  // mandatory=true
    MessageProperties.PERSISTENT_TEXT_PLAIN,  // 持久化
    message.getBytes()
);

// ========== Broker 侧 ==========
// 持久化 Exchange、Queue
channel.exchangeDeclare("my_exchange", "direct", true);
channel.queueDeclare("my_queue", true, false, false, null);

// ========== 消费者侧 ==========
// 手动 ACK + QoS
channel.basicQos(10);
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        long deliveryTag = envelope.getDeliveryTag();
        try {
            processMessage(new String(body));
            channel.basicAck(deliveryTag, false);
        } catch (BusinessException e) {
            // 业务异常:丢弃或发送到死信队列
            channel.basicNack(deliveryTag, false, false);
        } catch (Exception e) {
            // 系统异常:重新入队
            channel.basicNack(deliveryTag, false, true);
        }
    }
});

五、高可用与容灾

1. 集群 + 镜像队列

// 设置镜像队列策略(所有节点同步)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'

2. 仲裁队列(Quorum Queue,推荐)

// 声明仲裁队列(基于 Raft 协议,更强一致性)
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("quorum_queue", true, false, false, args);

对比

  • 镜像队列:基于主从同步,性能较好但可能丢消息
  • 仲裁队列:基于 Raft 共识算法,可靠性更高但性能略低

六、消息幂等性与去重

1. 全局唯一消息 ID

// 生产者:生成唯一 ID
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .messageId(messageId)
    .build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());

// 消费者:基于 Redis 去重
String messageId = properties.getMessageId();
if (redis.setNX("msg:" + messageId, "1", 24, TimeUnit.HOURS)) {
    // 第一次处理
    processMessage(message);
} else {
    // 重复消息,直接 ACK
}
channel.basicAck(deliveryTag, false);

2. 业务幂等性设计

  • 订单号、流水号等天然幂等
  • 数据库唯一索引约束
  • 状态机模式(只能从状态 A 转到状态 B)

面试总结

三大可靠性保障

  1. 生产者 → Broker
    • Publisher Confirm 异步模式(推荐)
    • Mandatory + ReturnListener(处理无法路由的消息)
  2. Broker 内部
    • Exchange、Queue、Message 三重持久化
    • 集群 + 仲裁队列(高可用)
  3. Broker → 消费者
    • 手动 ACK 模式
    • 失败重试 + 死信队列
    • 消费端幂等性保证

可能丢消息的场景

  • 生产者发送失败未处理 → Confirm 模式解决
  • Broker 宕机消息在内存 → 持久化 + 集群解决
  • 消费者自动 ACK 后宕机 → 手动 ACK 解决

性能与可靠性权衡

  • 高可靠:事务模式(不推荐,性能差)
  • 较高可靠:持久化 + Confirm + 手动 ACK(推荐)
  • 高性能:不持久化 + 自动 ACK(不可靠)

面试加分项

  • 提到消息补偿机制(定时任务扫描未确认的消息)
  • 提到监控告警(消息堆积、消费延迟)
  • 提到仲裁队列(Quorum Queue)的优势