核心概念

死信(Dead Letter):无法被正常消费的消息,会被重新路由到另一个 Exchange,这个 Exchange 称为死信交换器(DLX,Dead Letter Exchange),绑定到 DLX 的队列称为死信队列(DLQ)

作用

  • 保存无法处理的异常消息
  • 实现延迟队列功能
  • 消息重试机制
  • 消息审计与排查

一、消息成为死信的三种情况

1. 消息被拒绝(reject/nack)且不重新入队

// 情况1:basicReject + requeue=false
channel.basicReject(deliveryTag, false);

// 情况2:basicNack + requeue=false
channel.basicNack(deliveryTag, false, false);

场景:消息格式错误、业务逻辑异常,无法正常处理。


2. 消息过期(TTL 超时)

// 方式1:设置队列的消息 TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);  // 队列中所有消息 60 秒过期
channel.queueDeclare("my_queue", true, false, false, args);

// 方式2:设置单条消息的 TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .expiration("10000")  // 单条消息 10 秒过期
    .build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());

注意

  • 队列 TTL:到期后消息立即成为死信
  • 单条消息 TTL:只有消息到达队列头部时才会检查是否过期

3. 队列达到最大长度

// 设置队列最大长度
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 1000);  // 队列最多 1000 条消息
channel.queueDeclare("my_queue", true, false, false, args);

// 或设置队列最大字节数
args.put("x-max-length-bytes", 1048576);  // 队列最多 1MB

场景:队列满时,新消息入队会导致队头消息成为死信。


二、死信队列配置

完整示例:业务队列 + 死信队列

public class DeadLetterQueueExample {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // ========== 1. 声明死信交换器 ==========
        String dlxExchange = "dlx_exchange";
        channel.exchangeDeclare(dlxExchange, BuiltinExchangeType.DIRECT, true);
        
        // ========== 2. 声明死信队列 ==========
        String dlQueue = "dead_letter_queue";
        channel.queueDeclare(dlQueue, true, false, false, null);
        
        // ========== 3. 绑定死信队列到死信交换器 ==========
        channel.queueBind(dlQueue, dlxExchange, "dead_letter_routing_key");
        
        // ========== 4. 声明业务队列,关联死信交换器 ==========
        String businessQueue = "business_queue";
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", dlxExchange);  // 指定 DLX
        args.put("x-dead-letter-routing-key", "dead_letter_routing_key");  // 死信路由键
        args.put("x-message-ttl", 10000);  // 可选:消息 10 秒后过期
        args.put("x-max-length", 5);  // 可选:队列最多 5 条消息
        
        channel.queueDeclare(businessQueue, true, false, false, args);
        
        // ========== 5. 绑定业务队列到业务交换器 ==========
        String businessExchange = "business_exchange";
        channel.exchangeDeclare(businessExchange, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(businessQueue, businessExchange, "business_key");
        
        System.out.println("死信队列配置完成");
    }
}

关键参数

  • x-dead-letter-exchange:指定死信交换器
  • x-dead-letter-routing-key:死信路由键(可选,不指定则使用原消息的 routing key)

死信队列消息的特殊属性

死信消息会携带额外的 header 信息:

// 消费死信队列
channel.basicConsume("dead_letter_queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        Map<String, Object> headers = properties.getHeaders();
        
        // 获取死信原因
        String reason = (String) headers.get("x-first-death-reason");
        // 可能的值:rejected, expired, maxlen
        
        // 获取原队列名称
        String queue = (String) headers.get("x-first-death-queue");
        
        // 获取原交换器名称
        String exchange = (String) headers.get("x-first-death-exchange");
        
        System.out.println("死信原因: " + reason);
        System.out.println("原队列: " + queue);
        System.out.println("消息内容: " + new String(body));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

三、死信队列的应用场景

场景1:实现延迟队列

原理:利用 TTL + 死信队列实现消息延迟消费。

public class DelayQueue {
    
    /**
     * 创建延迟队列
     * @param delayTime 延迟时间(毫秒)
     */
    public static void createDelayQueue(Channel channel, int delayTime) 
            throws Exception {
        
        // 1. 死信交换器 + 死信队列(真正消费的队列)
        String dlxExchange = "delay_dlx";
        String dlQueue = "real_consume_queue";
        channel.exchangeDeclare(dlxExchange, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(dlQueue, true, false, false, null);
        channel.queueBind(dlQueue, dlxExchange, "delay_key");
        
        // 2. 延迟队列(中转队列,不消费)
        String delayQueue = "delay_queue_" + delayTime;
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", dlxExchange);
        args.put("x-dead-letter-routing-key", "delay_key");
        args.put("x-message-ttl", delayTime);  // 延迟时间
        
        channel.queueDeclare(delayQueue, true, false, false, args);
        
        // 3. 延迟队列的交换器
        String delayExchange = "delay_exchange";
        channel.exchangeDeclare(delayExchange, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(delayQueue, delayExchange, "delay_" + delayTime);
    }
    
    /**
     * 发送延迟消息
     */
    public static void sendDelayMessage(Channel channel, String message, 
                                        int delayTime) throws Exception {
        String delayExchange = "delay_exchange";
        String routingKey = "delay_" + delayTime;
        channel.basicPublish(delayExchange, routingKey, 
            MessageProperties.PERSISTENT_TEXT_PLAIN, 
            message.getBytes());
        System.out.println("发送延迟消息: " + message + ", 延迟: " + delayTime + "ms");
    }
    
    /**
     * 消费延迟后的消息
     */
    public static void consumeDelayMessage(Channel channel) throws Exception {
        String dlQueue = "real_consume_queue";
        channel.basicConsume(dlQueue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) 
                    throws IOException {
                System.out.println("收到延迟消息: " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

// 使用示例
public static void main(String[] args) throws Exception {
    Channel channel = createChannel();
    
    // 创建 30 秒延迟队列
    DelayQueue.createDelayQueue(channel, 30000);
    
    // 发送延迟消息
    DelayQueue.sendDelayMessage(channel, "30秒后执行的任务", 30000);
    
    // 消费延迟消息
    DelayQueue.consumeDelayMessage(channel);
}

应用场景

  • 订单超时取消(30分钟未支付)
  • 定时任务调度
  • 消息重试(失败后延迟重试)

注意

  • ⚠️ RabbitMQ 的延迟队列不适合大量不同延迟时间的场景(需要创建多个队列)
  • ✅ 推荐使用 RabbitMQ 插件 rabbitmq_delayed_message_exchange 实现真正的延迟队列

场景2:消息失败重试机制

public class RetryWithDLQ {
    
    private static final int MAX_RETRY_COUNT = 3;
    
    /**
     * 创建业务队列 + 重试队列 + 死信队列
     */
    public static void setup(Channel channel) throws Exception {
        // 1. 最终死信队列(重试 N 次后的消息)
        String finalDLQ = "final_dead_letter_queue";
        channel.queueDeclare(finalDLQ, true, false, false, null);
        
        // 2. 重试队列(自动重新路由到业务队列)
        String retryExchange = "retry_exchange";
        channel.exchangeDeclare(retryExchange, BuiltinExchangeType.DIRECT, true);
        
        String retryQueue = "retry_queue";
        Map<String, Object> retryArgs = new HashMap<>();
        retryArgs.put("x-dead-letter-exchange", "business_exchange");
        retryArgs.put("x-dead-letter-routing-key", "business_key");
        retryArgs.put("x-message-ttl", 5000);  // 5 秒后重试
        
        channel.queueDeclare(retryQueue, true, false, false, retryArgs);
        channel.queueBind(retryQueue, retryExchange, "retry_key");
        
        // 3. 业务队列
        String businessQueue = "business_queue";
        Map<String, Object> businessArgs = new HashMap<>();
        businessArgs.put("x-dead-letter-exchange", retryExchange);
        businessArgs.put("x-dead-letter-routing-key", "retry_key");
        
        channel.queueDeclare(businessQueue, true, false, false, businessArgs);
        
        String businessExchange = "business_exchange";
        channel.exchangeDeclare(businessExchange, BuiltinExchangeType.DIRECT, true);
        channel.queueBind(businessQueue, businessExchange, "business_key");
    }
    
    /**
     * 消费业务消息,失败自动重试
     */
    public static void consumeWithRetry(Channel channel) throws Exception {
        channel.basicConsume("business_queue", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) 
                    throws IOException {
                try {
                    // 获取重试次数
                    Map<String, Object> headers = properties.getHeaders();
                    int retryCount = 0;
                    if (headers != null && headers.containsKey("x-retry-count")) {
                        retryCount = (int) headers.get("x-retry-count");
                    }
                    
                    // 业务处理
                    processMessage(new String(body));
                    
                    // 成功:ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    
                } catch (Exception e) {
                    Map<String, Object> headers = properties.getHeaders();
                    if (headers == null) {
                        headers = new HashMap<>();
                    }
                    
                    int retryCount = (int) headers.getOrDefault("x-retry-count", 0);
                    retryCount++;
                    
                    if (retryCount >= MAX_RETRY_COUNT) {
                        // 超过最大重试次数,发送到最终死信队列
                        AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
                            .headers(headers)
                            .build();
                        channel.basicPublish("", "final_dead_letter_queue", 
                            newProps, body);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } else {
                        // 更新重试次数,拒绝消息(进入重试队列)
                        headers.put("x-retry-count", retryCount);
                        AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
                            .headers(headers)
                            .build();
                        
                        // 拒绝消息,进入死信队列(重试队列)
                        channel.basicNack(envelope.getDeliveryTag(), false, false);
                    }
                }
            }
        });
    }
    
    private static void processMessage(String message) throws Exception {
        // 业务逻辑(可能抛出异常)
        if (Math.random() > 0.7) {
            throw new Exception("业务处理失败");
        }
        System.out.println("处理消息: " + message);
    }
}

重试流程

业务队列 → 消费失败 → 重试队列(TTL 5秒) → 业务队列 → 消费失败 → ...
                                                    ↓
                                           重试 3 次后 → 最终死信队列

场景3:消息审计与告警

// 监控死信队列,发现异常消息时告警
channel.basicConsume("dead_letter_queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) 
            throws IOException {
        Map<String, Object> headers = properties.getHeaders();
        String reason = (String) headers.get("x-first-death-reason");
        String queue = (String) headers.get("x-first-death-queue");
        
        // 记录日志
        log.error("发现死信消息 - 原因: {}, 队列: {}, 内容: {}", 
            reason, queue, new String(body));
        
        // 发送告警(钉钉、邮件等)
        alertService.sendAlert("RabbitMQ 死信消息", new String(body));
        
        // 保存到数据库进行排查
        deadLetterRepository.save(new DeadLetter(queue, reason, new String(body)));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

四、死信队列的注意事项

1. 死信循环问题

错误配置:死信队列的死信交换器又指向原队列,导致死循环。

// 错误示例:A 的死信是 B,B 的死信是 A
Map<String, Object> argsA = new HashMap<>();
argsA.put("x-dead-letter-exchange", "exchangeB");

Map<String, Object> argsB = new HashMap<>();
argsB.put("x-dead-letter-exchange", "exchangeA");  // 死循环!

正确做法:死信队列不再配置 DLX,或配置到最终死信队列。


2. 性能影响

  • 死信消息需要重新路由,会增加 Broker 负担
  • 大量死信会导致队列堆积
  • 建议定期清理死信队列

3. 延迟队列的局限性

使用 TTL + 死信实现延迟队列存在问题:

队列 TTL 的坑:如果队列中有多个不同 TTL 的消息,只有队头消息过期才会被移除。

// 发送 30 秒 TTL 的消息
channel.basicPublish(exchange, routingKey, 
    new AMQP.BasicProperties.Builder().expiration("30000").build(), 
    "msg1".getBytes());

// 发送 5 秒 TTL 的消息
channel.basicPublish(exchange, routingKey, 
    new AMQP.BasicProperties.Builder().expiration("5000").build(), 
    "msg2".getBytes());

// 问题:msg2 需要等 msg1 过期后才能被处理(即使 msg2 的 TTL 更短)

解决方案

  1. 为不同延迟时间创建不同队列
  2. 使用 rabbitmq_delayed_message_exchange 插件

五、延迟队列插件(推荐)

安装插件

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用插件

// 声明延迟交换器
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");  // 指定延迟交换器的类型
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);

// 绑定队列
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_key");

// 发送延迟消息
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 30000);  // 延迟 30 秒
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(headers)
    .build();
channel.basicPublish("delayed_exchange", "delayed_key", props, "延迟消息".getBytes());

优点

  • ✅ 支持任意延迟时间
  • ✅ 不会出现队头阻塞问题
  • ✅ 性能更好

面试总结

死信队列核心要点

  1. 三种死信产生情况
    • 消息被拒绝(reject/nack + requeue=false)
    • 消息过期(TTL)
    • 队列满(max-length)
  2. 配置关键参数
    • x-dead-letter-exchange:指定死信交换器
    • x-dead-letter-routing-key:死信路由键(可选)
  3. 典型应用场景
    • 延迟队列:订单超时取消、定时任务
    • 消息重试:失败后延迟重试,超过阈值进入最终死信队列
    • 异常监控:收集无法处理的消息,告警排查
  4. 延迟队列实现方案
    • TTL + 死信队列(简单但有局限)
    • rabbitmq_delayed_message_exchange 插件(推荐)
  5. 注意事项
    • 避免死信循环(A → B → A)
    • 队列 TTL 只检查队头消息
    • 定期清理死信队列,避免堆积

面试加分项

  • 提到延迟队列插件的优势
  • 说明消息重试的指数退避策略
  • 提到死信消息的 header 信息(x-first-death-reason 等)