核心概念
消息可靠性指的是确保消息不丢失、不重复、不乱序地从生产者传递到消费者。RabbitMQ 通过三个阶段的可靠性保障机制来实现:
生产者 → RabbitMQ → 消费者
① ② ③
- 生产者到 Broker:事务机制 / Confirm 模式
- Broker 内部持久化:Exchange、Queue、Message 持久化
- Broker 到消费者:ACK 确认机制
一、生产者到 Broker 的可靠性
1. 事务机制(不推荐)
通过 AMQP 事务确保消息发送成功,但性能极差(吞吐量下降 250 倍)。
try {
// 开启事务
channel.txSelect();
// 发送消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
// 提交事务
channel.txCommit();
} catch (Exception e) {
// 回滚事务
channel.txRollback();
}
缺点:
- ❌ 同步阻塞,性能极低
- ❌ 与 Confirm 模式互斥
2. Publisher Confirm 模式(推荐)
生产者发送消息后,Broker 会异步返回确认(ACK/NACK),性能远高于事务。
2.1 同步单条确认
// 开启 Confirm 模式
channel.confirmSelect();
// 发送消息
channel.basicPublish(exchange, routingKey, null, message.getBytes());
// 同步等待确认(阻塞)
if (channel.waitForConfirms()) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
特点:
- ✅ 可靠性高
- ❌ 同步等待,性能一般
2.2 批量确认
channel.confirmSelect();
int batchSize = 100;
int count = 0;
for (String message : messages) {
channel.basicPublish(exchange, routingKey, null, message.getBytes());
count++;
if (count % batchSize == 0) {
channel.waitForConfirms(); // 批量确认
}
}
特点:
- ✅ 性能提升
- ❌ 失败后不知道是哪条消息出错
2.3 异步 Confirm 模式(最佳实践)
channel.confirmSelect();
// 添加确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// 消息确认成功
System.out.println("消息 " + deliveryTag + " 发送成功");
// 从缓存中删除已确认的消息
if (multiple) {
// 批量确认:deliveryTag 之前的所有消息
confirmCache.headMap(deliveryTag, true).clear();
} else {
confirmCache.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// 消息确认失败
System.err.println("消息 " + deliveryTag + " 发送失败");
// 重新发送失败的消息
String message = confirmCache.get(deliveryTag);
channel.basicPublish(exchange, routingKey, null, message.getBytes());
}
});
// 发送消息并记录到缓存
long deliveryTag = channel.getNextPublishSeqNo();
confirmCache.put(deliveryTag, message);
channel.basicPublish(exchange, routingKey, null, message.getBytes());
优点:
- ✅ 异步非阻塞,性能最高
- ✅ 可精确定位失败的消息
- ✅ 生产环境推荐方案
3. Mandatory 参数 + ReturnListener
解决消息无法路由到队列的问题。
// 设置 mandatory=true,无法路由的消息会返回给生产者
channel.basicPublish(exchange, routingKey,
true, // mandatory
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes()
);
// 监听无法路由的消息
channel.addReturnListener((replyCode, replyText, exchange, routingKey,
properties, body) -> {
System.err.println("消息无法路由: " + new String(body));
// 记录日志或重新发送
});
场景:Exchange 存在但没有匹配的队列绑定。
二、Broker 内部持久化
1. Exchange 持久化
// durable=true,Exchange 元数据持久化到磁盘
channel.exchangeDeclare("my_exchange", BuiltinExchangeType.DIRECT, true);
2. Queue 持久化
// durable=true,Queue 元数据持久化
channel.queueDeclare("my_queue",
true, // durable:持久化
false, // exclusive:排他
false, // autoDelete:自动删除
null // arguments
);
3. Message 持久化
// 设置消息为持久化(deliveryMode=2)
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(exchange, routingKey, props, message.getBytes());
持久化原理:
- 消息先写入内存,异步刷盘(批量写入)
- RabbitMQ 重启后,从磁盘恢复持久化的 Exchange、Queue、Message
注意:
- ⚠️ 持久化会降低性能(约 10 倍)
- ⚠️ 即使持久化,也存在消息还在内存时 Broker 宕机的风险
- ✅ 配合 Publisher Confirm 使用可确保消息已刷盘
三、Broker 到消费者的可靠性
1. 手动 ACK 模式(推荐)
// autoAck=false,手动确认模式
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
long deliveryTag = envelope.getDeliveryTag();
try {
// 业务处理
processMessage(new String(body));
// 手动确认(单条)
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 处理失败,拒绝消息
channel.basicNack(deliveryTag,
false, // multiple:是否批量
true // requeue:是否重新入队
);
}
}
});
ACK 模式对比:
| 模式 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| 自动 ACK | 消息发送给消费者后立即确认 | 性能高 | 消费者宕机会丢消息 |
| 手动 ACK | 消费者处理完成后手动确认 | 可靠性高 | 需要处理各种异常情况 |
2. ACK/NACK/Reject 区别
// 1. basicAck:确认消息,从队列删除
channel.basicAck(deliveryTag, false);
// 2. basicNack:拒绝消息,可选择是否重新入队
channel.basicNack(deliveryTag, false, true); // requeue=true 重新入队
// 3. basicReject:拒绝单条消息(不支持批量)
channel.basicReject(deliveryTag, false); // requeue=false 丢弃消息
重新入队的风险:
- ⚠️ 如果消息本身有问题,会导致死循环
- ✅ 建议:失败 N 次后发送到死信队列
3. 预取数量(QoS)
// 限制消费者同时处理的未确认消息数量
channel.basicQos(10); // prefetchCount=10,一次最多拉取 10 条
作用:
- 防止消费者堆积过多未确认消息导致 OOM
- 实现负载均衡(能力强的消费者处理更多消息)
四、消息可靠性完整方案
最佳实践代码
// ========== 生产者侧 ==========
// 1. 开启 Confirm 模式
channel.confirmSelect();
// 2. 添加异步确认监听器
ConcurrentSkipListMap<Long, String> confirmCache = new ConcurrentSkipListMap<>();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
if (multiple) {
confirmCache.headMap(deliveryTag, true).clear();
} else {
confirmCache.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
String msg = confirmCache.get(deliveryTag);
// 重新发送或记录日志
resendMessage(msg);
}
});
// 3. 添加 Return 监听器(处理无法路由的消息)
channel.addReturnListener((replyCode, replyText, exchange, routingKey,
properties, body) -> {
log.error("消息无法路由: {}", new String(body));
// 发送到备用队列或报警
});
// 4. 发送持久化消息
long deliveryTag = channel.getNextPublishSeqNo();
confirmCache.put(deliveryTag, message);
channel.basicPublish(
exchange,
routingKey,
true, // mandatory=true
MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化
message.getBytes()
);
// ========== Broker 侧 ==========
// 持久化 Exchange、Queue
channel.exchangeDeclare("my_exchange", "direct", true);
channel.queueDeclare("my_queue", true, false, false, null);
// ========== 消费者侧 ==========
// 手动 ACK + QoS
channel.basicQos(10);
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
long deliveryTag = envelope.getDeliveryTag();
try {
processMessage(new String(body));
channel.basicAck(deliveryTag, false);
} catch (BusinessException e) {
// 业务异常:丢弃或发送到死信队列
channel.basicNack(deliveryTag, false, false);
} catch (Exception e) {
// 系统异常:重新入队
channel.basicNack(deliveryTag, false, true);
}
}
});
五、高可用与容灾
1. 集群 + 镜像队列
// 设置镜像队列策略(所有节点同步)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'
2. 仲裁队列(Quorum Queue,推荐)
// 声明仲裁队列(基于 Raft 协议,更强一致性)
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("quorum_queue", true, false, false, args);
对比:
- 镜像队列:基于主从同步,性能较好但可能丢消息
- 仲裁队列:基于 Raft 共识算法,可靠性更高但性能略低
六、消息幂等性与去重
1. 全局唯一消息 ID
// 生产者:生成唯一 ID
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());
// 消费者:基于 Redis 去重
String messageId = properties.getMessageId();
if (redis.setNX("msg:" + messageId, "1", 24, TimeUnit.HOURS)) {
// 第一次处理
processMessage(message);
} else {
// 重复消息,直接 ACK
}
channel.basicAck(deliveryTag, false);
2. 业务幂等性设计
- 订单号、流水号等天然幂等
- 数据库唯一索引约束
- 状态机模式(只能从状态 A 转到状态 B)
面试总结
三大可靠性保障:
- 生产者 → Broker:
- Publisher Confirm 异步模式(推荐)
- Mandatory + ReturnListener(处理无法路由的消息)
- Broker 内部:
- Exchange、Queue、Message 三重持久化
- 集群 + 仲裁队列(高可用)
- Broker → 消费者:
- 手动 ACK 模式
- 失败重试 + 死信队列
- 消费端幂等性保证
可能丢消息的场景:
- 生产者发送失败未处理 → Confirm 模式解决
- Broker 宕机消息在内存 → 持久化 + 集群解决
- 消费者自动 ACK 后宕机 → 手动 ACK 解决
性能与可靠性权衡:
- 高可靠:事务模式(不推荐,性能差)
- 较高可靠:持久化 + Confirm + 手动 ACK(推荐)
- 高性能:不持久化 + 自动 ACK(不可靠)
面试加分项:
- 提到消息补偿机制(定时任务扫描未确认的消息)
- 提到监控告警(消息堆积、消费延迟)
- 提到仲裁队列(Quorum Queue)的优势