核心概念
死信(Dead Letter):无法被正常消费的消息,会被重新路由到另一个 Exchange,这个 Exchange 称为死信交换器(DLX,Dead Letter Exchange),绑定到 DLX 的队列称为死信队列(DLQ)。
作用:
- 保存无法处理的异常消息
- 实现延迟队列功能
- 消息重试机制
- 消息审计与排查
一、消息成为死信的三种情况
1. 消息被拒绝(reject/nack)且不重新入队
// 情况1:basicReject + requeue=false
channel.basicReject(deliveryTag, false);
// 情况2:basicNack + requeue=false
channel.basicNack(deliveryTag, false, false);
场景:消息格式错误、业务逻辑异常,无法正常处理。
2. 消息过期(TTL 超时)
// 方式1:设置队列的消息 TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 队列中所有消息 60 秒过期
channel.queueDeclare("my_queue", true, false, false, args);
// 方式2:设置单条消息的 TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("10000") // 单条消息 10 秒过期
.build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());
注意:
- 队列 TTL:到期后消息立即成为死信
- 单条消息 TTL:只有消息到达队列头部时才会检查是否过期
3. 队列达到最大长度
// 设置队列最大长度
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 1000); // 队列最多 1000 条消息
channel.queueDeclare("my_queue", true, false, false, args);
// 或设置队列最大字节数
args.put("x-max-length-bytes", 1048576); // 队列最多 1MB
场景:队列满时,新消息入队会导致队头消息成为死信。
二、死信队列配置
完整示例:业务队列 + 死信队列
public class DeadLetterQueueExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// ========== 1. 声明死信交换器 ==========
String dlxExchange = "dlx_exchange";
channel.exchangeDeclare(dlxExchange, BuiltinExchangeType.DIRECT, true);
// ========== 2. 声明死信队列 ==========
String dlQueue = "dead_letter_queue";
channel.queueDeclare(dlQueue, true, false, false, null);
// ========== 3. 绑定死信队列到死信交换器 ==========
channel.queueBind(dlQueue, dlxExchange, "dead_letter_routing_key");
// ========== 4. 声明业务队列,关联死信交换器 ==========
String businessQueue = "business_queue";
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", dlxExchange); // 指定 DLX
args.put("x-dead-letter-routing-key", "dead_letter_routing_key"); // 死信路由键
args.put("x-message-ttl", 10000); // 可选:消息 10 秒后过期
args.put("x-max-length", 5); // 可选:队列最多 5 条消息
channel.queueDeclare(businessQueue, true, false, false, args);
// ========== 5. 绑定业务队列到业务交换器 ==========
String businessExchange = "business_exchange";
channel.exchangeDeclare(businessExchange, BuiltinExchangeType.DIRECT, true);
channel.queueBind(businessQueue, businessExchange, "business_key");
System.out.println("死信队列配置完成");
}
}
关键参数:
x-dead-letter-exchange:指定死信交换器x-dead-letter-routing-key:死信路由键(可选,不指定则使用原消息的 routing key)
死信队列消息的特殊属性
死信消息会携带额外的 header 信息:
// 消费死信队列
channel.basicConsume("dead_letter_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
Map<String, Object> headers = properties.getHeaders();
// 获取死信原因
String reason = (String) headers.get("x-first-death-reason");
// 可能的值:rejected, expired, maxlen
// 获取原队列名称
String queue = (String) headers.get("x-first-death-queue");
// 获取原交换器名称
String exchange = (String) headers.get("x-first-death-exchange");
System.out.println("死信原因: " + reason);
System.out.println("原队列: " + queue);
System.out.println("消息内容: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
三、死信队列的应用场景
场景1:实现延迟队列
原理:利用 TTL + 死信队列实现消息延迟消费。
public class DelayQueue {
/**
* 创建延迟队列
* @param delayTime 延迟时间(毫秒)
*/
public static void createDelayQueue(Channel channel, int delayTime)
throws Exception {
// 1. 死信交换器 + 死信队列(真正消费的队列)
String dlxExchange = "delay_dlx";
String dlQueue = "real_consume_queue";
channel.exchangeDeclare(dlxExchange, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(dlQueue, true, false, false, null);
channel.queueBind(dlQueue, dlxExchange, "delay_key");
// 2. 延迟队列(中转队列,不消费)
String delayQueue = "delay_queue_" + delayTime;
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", dlxExchange);
args.put("x-dead-letter-routing-key", "delay_key");
args.put("x-message-ttl", delayTime); // 延迟时间
channel.queueDeclare(delayQueue, true, false, false, args);
// 3. 延迟队列的交换器
String delayExchange = "delay_exchange";
channel.exchangeDeclare(delayExchange, BuiltinExchangeType.DIRECT, true);
channel.queueBind(delayQueue, delayExchange, "delay_" + delayTime);
}
/**
* 发送延迟消息
*/
public static void sendDelayMessage(Channel channel, String message,
int delayTime) throws Exception {
String delayExchange = "delay_exchange";
String routingKey = "delay_" + delayTime;
channel.basicPublish(delayExchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println("发送延迟消息: " + message + ", 延迟: " + delayTime + "ms");
}
/**
* 消费延迟后的消息
*/
public static void consumeDelayMessage(Channel channel) throws Exception {
String dlQueue = "real_consume_queue";
channel.basicConsume(dlQueue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("收到延迟消息: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
// 使用示例
public static void main(String[] args) throws Exception {
Channel channel = createChannel();
// 创建 30 秒延迟队列
DelayQueue.createDelayQueue(channel, 30000);
// 发送延迟消息
DelayQueue.sendDelayMessage(channel, "30秒后执行的任务", 30000);
// 消费延迟消息
DelayQueue.consumeDelayMessage(channel);
}
应用场景:
- 订单超时取消(30分钟未支付)
- 定时任务调度
- 消息重试(失败后延迟重试)
注意:
- ⚠️ RabbitMQ 的延迟队列不适合大量不同延迟时间的场景(需要创建多个队列)
- ✅ 推荐使用 RabbitMQ 插件
rabbitmq_delayed_message_exchange实现真正的延迟队列
场景2:消息失败重试机制
public class RetryWithDLQ {
private static final int MAX_RETRY_COUNT = 3;
/**
* 创建业务队列 + 重试队列 + 死信队列
*/
public static void setup(Channel channel) throws Exception {
// 1. 最终死信队列(重试 N 次后的消息)
String finalDLQ = "final_dead_letter_queue";
channel.queueDeclare(finalDLQ, true, false, false, null);
// 2. 重试队列(自动重新路由到业务队列)
String retryExchange = "retry_exchange";
channel.exchangeDeclare(retryExchange, BuiltinExchangeType.DIRECT, true);
String retryQueue = "retry_queue";
Map<String, Object> retryArgs = new HashMap<>();
retryArgs.put("x-dead-letter-exchange", "business_exchange");
retryArgs.put("x-dead-letter-routing-key", "business_key");
retryArgs.put("x-message-ttl", 5000); // 5 秒后重试
channel.queueDeclare(retryQueue, true, false, false, retryArgs);
channel.queueBind(retryQueue, retryExchange, "retry_key");
// 3. 业务队列
String businessQueue = "business_queue";
Map<String, Object> businessArgs = new HashMap<>();
businessArgs.put("x-dead-letter-exchange", retryExchange);
businessArgs.put("x-dead-letter-routing-key", "retry_key");
channel.queueDeclare(businessQueue, true, false, false, businessArgs);
String businessExchange = "business_exchange";
channel.exchangeDeclare(businessExchange, BuiltinExchangeType.DIRECT, true);
channel.queueBind(businessQueue, businessExchange, "business_key");
}
/**
* 消费业务消息,失败自动重试
*/
public static void consumeWithRetry(Channel channel) throws Exception {
channel.basicConsume("business_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
// 获取重试次数
Map<String, Object> headers = properties.getHeaders();
int retryCount = 0;
if (headers != null && headers.containsKey("x-retry-count")) {
retryCount = (int) headers.get("x-retry-count");
}
// 业务处理
processMessage(new String(body));
// 成功:ACK
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
Map<String, Object> headers = properties.getHeaders();
if (headers == null) {
headers = new HashMap<>();
}
int retryCount = (int) headers.getOrDefault("x-retry-count", 0);
retryCount++;
if (retryCount >= MAX_RETRY_COUNT) {
// 超过最大重试次数,发送到最终死信队列
AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("", "final_dead_letter_queue",
newProps, body);
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
// 更新重试次数,拒绝消息(进入重试队列)
headers.put("x-retry-count", retryCount);
AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
// 拒绝消息,进入死信队列(重试队列)
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
}
}
});
}
private static void processMessage(String message) throws Exception {
// 业务逻辑(可能抛出异常)
if (Math.random() > 0.7) {
throw new Exception("业务处理失败");
}
System.out.println("处理消息: " + message);
}
}
重试流程:
业务队列 → 消费失败 → 重试队列(TTL 5秒) → 业务队列 → 消费失败 → ...
↓
重试 3 次后 → 最终死信队列
场景3:消息审计与告警
// 监控死信队列,发现异常消息时告警
channel.basicConsume("dead_letter_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
Map<String, Object> headers = properties.getHeaders();
String reason = (String) headers.get("x-first-death-reason");
String queue = (String) headers.get("x-first-death-queue");
// 记录日志
log.error("发现死信消息 - 原因: {}, 队列: {}, 内容: {}",
reason, queue, new String(body));
// 发送告警(钉钉、邮件等)
alertService.sendAlert("RabbitMQ 死信消息", new String(body));
// 保存到数据库进行排查
deadLetterRepository.save(new DeadLetter(queue, reason, new String(body)));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
四、死信队列的注意事项
1. 死信循环问题
❌ 错误配置:死信队列的死信交换器又指向原队列,导致死循环。
// 错误示例:A 的死信是 B,B 的死信是 A
Map<String, Object> argsA = new HashMap<>();
argsA.put("x-dead-letter-exchange", "exchangeB");
Map<String, Object> argsB = new HashMap<>();
argsB.put("x-dead-letter-exchange", "exchangeA"); // 死循环!
✅ 正确做法:死信队列不再配置 DLX,或配置到最终死信队列。
2. 性能影响
- 死信消息需要重新路由,会增加 Broker 负担
- 大量死信会导致队列堆积
- 建议定期清理死信队列
3. 延迟队列的局限性
使用 TTL + 死信实现延迟队列存在问题:
❌ 队列 TTL 的坑:如果队列中有多个不同 TTL 的消息,只有队头消息过期才会被移除。
// 发送 30 秒 TTL 的消息
channel.basicPublish(exchange, routingKey,
new AMQP.BasicProperties.Builder().expiration("30000").build(),
"msg1".getBytes());
// 发送 5 秒 TTL 的消息
channel.basicPublish(exchange, routingKey,
new AMQP.BasicProperties.Builder().expiration("5000").build(),
"msg2".getBytes());
// 问题:msg2 需要等 msg1 过期后才能被处理(即使 msg2 的 TTL 更短)
✅ 解决方案:
- 为不同延迟时间创建不同队列
- 使用
rabbitmq_delayed_message_exchange插件
五、延迟队列插件(推荐)
安装插件
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用插件
// 声明延迟交换器
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 指定延迟交换器的类型
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
// 绑定队列
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_key");
// 发送延迟消息
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 30000); // 延迟 30 秒
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("delayed_exchange", "delayed_key", props, "延迟消息".getBytes());
优点:
- ✅ 支持任意延迟时间
- ✅ 不会出现队头阻塞问题
- ✅ 性能更好
面试总结
死信队列核心要点:
- 三种死信产生情况:
- 消息被拒绝(reject/nack + requeue=false)
- 消息过期(TTL)
- 队列满(max-length)
- 配置关键参数:
x-dead-letter-exchange:指定死信交换器x-dead-letter-routing-key:死信路由键(可选)
- 典型应用场景:
- 延迟队列:订单超时取消、定时任务
- 消息重试:失败后延迟重试,超过阈值进入最终死信队列
- 异常监控:收集无法处理的消息,告警排查
- 延迟队列实现方案:
- TTL + 死信队列(简单但有局限)
rabbitmq_delayed_message_exchange插件(推荐)
- 注意事项:
- 避免死信循环(A → B → A)
- 队列 TTL 只检查队头消息
- 定期清理死信队列,避免堆积
面试加分项:
- 提到延迟队列插件的优势
- 说明消息重试的指数退避策略
- 提到死信消息的 header 信息(x-first-death-reason 等)