核心概念

消息队列(Message Queue,MQ) 是一种异步通信中间件,通过在应用程序之间传递消息来实现系统解耦。其核心思想是:生产者将消息发送到队列,消费者从队列中获取消息进行处理,生产者和消费者无需直接交互

核心价值

  • 解耦:降低系统间的依赖关系
  • 异步:提升系统响应速度
  • 削峰填谷:平滑流量波动
  • 可靠性:保证消息不丢失

六大核心使用场景

1. 异步处理(提升响应速度)

场景描述:用户注册后需要发送邮件、短信通知,如果同步执行会导致响应时间过长。

传统同步方式

// 用户注册(总耗时 = 注册 + 邮件 + 短信 = 50ms + 100ms + 100ms = 250ms)
@PostMapping("/register")
public Result register(User user) {
    // 1. 保存用户信息 (50ms)
    userService.saveUser(user);
    
    // 2. 发送注册邮件 (100ms)
    emailService.sendEmail(user.getEmail());
    
    // 3. 发送短信通知 (100ms)
    smsService.sendSms(user.getPhone());
    
    return Result.success("注册成功");
}

使用消息队列优化

// 用户注册(总耗时 = 注册 + 发送消息 = 50ms + 5ms = 55ms)
@PostMapping("/register")
public Result register(User user) {
    // 1. 保存用户信息 (50ms)
    userService.saveUser(user);
    
    // 2. 发送消息到队列 (5ms)
    mqProducer.send("user.register", new RegisterEvent(user));
    
    return Result.success("注册成功");
}

// 消费者异步处理
@RabbitListener(queues = "user.register")
public void handleRegister(RegisterEvent event) {
    // 异步发送邮件
    emailService.sendEmail(event.getEmail());
    
    // 异步发送短信
    smsService.sendSms(event.getPhone());
}

性能提升:响应时间从 250ms 降低到 55ms,用户体验显著提升。


2. 系统解耦(降低依赖关系)

场景描述:订单系统需要通知多个下游系统(库存、积分、物流),如果直接调用会导致强耦合。

传统强耦合方式

// 订单服务直接依赖多个系统
@Service
public class OrderService {
    @Autowired
    private InventoryService inventoryService;  // 依赖库存系统
    @Autowired
    private PointsService pointsService;        // 依赖积分系统
    @Autowired
    private LogisticsService logisticsService;  // 依赖物流系统
    
    public void createOrder(Order order) {
        // 保存订单
        orderMapper.insert(order);
        
        // 通知多个系统(强耦合)
        inventoryService.deductStock(order);
        pointsService.addPoints(order);
        logisticsService.createShipment(order);
    }
}

问题

  • 如果新增一个下游系统(如营销系统),需要修改订单服务代码
  • 如果某个下游系统不可用,订单创建会失败

使用消息队列解耦

// 订单服务只需发送事件,无需关心下游系统
@Service
public class OrderService {
    @Autowired
    private RocketMQTemplate mqTemplate;
    
    public void createOrder(Order order) {
        // 保存订单
        orderMapper.insert(order);
        
        // 发布订单创建事件
        mqTemplate.convertAndSend("order.created", new OrderCreatedEvent(order));
    }
}

// 库存服务消费消息
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    public void onMessage(OrderCreatedEvent event) {
        inventoryService.deductStock(event.getOrder());
    }
}

// 积分服务消费消息
@RocketMQMessageListener(topic = "order.created", consumerGroup = "points-group")
public class PointsConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    public void onMessage(OrderCreatedEvent event) {
        pointsService.addPoints(event.getOrder());
    }
}

// 物流服务消费消息(新增系统无需修改订单服务)
@RocketMQMessageListener(topic = "order.created", consumerGroup = "logistics-group")
public class LogisticsConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    public void onMessage(OrderCreatedEvent event) {
        logisticsService.createShipment(event.getOrder());
    }
}

优势

  • 订单服务与下游系统完全解耦
  • 新增/删除下游系统无需修改订单服务
  • 某个下游系统故障不影响订单创建

3. 流量削峰填谷(应对突发流量)

场景描述:秒杀活动瞬间涌入10万并发请求,数据库无法承受。

传统方式

// 直接写数据库,瞬间压垮数据库
@PostMapping("/seckill")
public Result seckill(Long productId, Long userId) {
    // 直接操作数据库(10万并发 → 数据库崩溃)
    boolean success = seckillService.processSeckill(productId, userId);
    return success ? Result.success() : Result.fail("秒杀失败");
}

使用消息队列削峰

// 将请求写入消息队列,慢慢消费
@PostMapping("/seckill")
public Result seckill(Long productId, Long userId) {
    // 1. 校验库存(基于 Redis)
    if (!redisTemplate.opsForValue().decrement("stock:" + productId) >= 0) {
        return Result.fail("库存不足");
    }
    
    // 2. 发送消息到队列(10万 QPS → 队列缓冲)
    mqProducer.send("seckill.order", new SeckillOrder(productId, userId));
    
    return Result.success("排队中,请稍后查看结果");
}

// 消费者按固定速率消费(如 1000 TPS)
@RabbitListener(queues = "seckill.order", concurrency = "10")
public void processSeckill(SeckillOrder order) {
    // 从队列慢慢消费,保护数据库
    seckillService.createOrder(order);
}

效果对比

方式 瞬时并发 数据库压力 结果
传统方式 10万 QPS 10万 QPS 数据库崩溃
消息队列 10万 QPS 1000 TPS(可控) 系统稳定

4. 日志收集与数据聚合

场景描述:微服务架构下,多个服务的日志需要统一收集到 ELK(Elasticsearch + Logstash + Kibana)。

架构设计

┌──────────────┐   ┌──────────────┐   ┌──────────────┐
│ 订单服务      │   │ 用户服务      │   │ 支付服务      │
│ (写入日志)    │   │ (写入日志)    │   │ (写入日志)    │
└──────┬───────┘   └──────┬───────┘   └──────┬───────┘
       │                   │                   │
       └───────────────────┴───────────────────┘
                           │
                           ↓
               ┌──────────────────────┐
               │    Kafka (消息队列)   │
               │   (日志缓冲)          │
               └──────────┬───────────┘
                          │
                          ↓
               ┌──────────────────────┐
               │   Logstash (消费者)   │
               │   (日志处理)          │
               └──────────┬───────────┘
                          │
                          ↓
               ┌──────────────────────┐
               │  Elasticsearch        │
               │  (日志存储与检索)      │
               └──────────────────────┘

生产者(业务服务)

// 使用 Logback 异步写入 Kafka
<!-- logback-spring.xml -->
<appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <encoder>
        <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n</pattern>
    </encoder>
    <topic>app-logs</topic>
    <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
    <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
    <producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</appender>

消费者(Logstash)

# logstash.conf
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs"]
    codec => "json"
  }
}

filter {
  # 解析日志
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
  }
}

5. 分布式事务(最终一致性)

场景描述:电商下单需要扣减库存、扣款、创建订单,涉及多个系统。

基于消息队列的最终一致性方案

// 1. 订单服务:创建订单并发送消息
@Transactional
public void createOrder(Order order) {
    // 保存订单(本地事务)
    orderMapper.insert(order);
    
    // 发送消息(事务消息,确保与本地事务一致性)
    rocketMQTemplate.sendMessageInTransaction(
        "order.created", 
        new OrderCreatedEvent(order), 
        null
    );
}

// 2. 库存服务:消费消息扣减库存
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    @Transactional
    public void onMessage(OrderCreatedEvent event) {
        // 扣减库存
        boolean success = inventoryService.deductStock(event.getOrder());
        
        if (!success) {
            // 库存不足,发送回滚消息
            mqTemplate.convertAndSend("order.cancel", event.getOrderId());
            throw new RuntimeException("库存不足");
        }
    }
}

// 3. 支付服务:消费消息扣款
@RocketMQMessageListener(topic = "order.created", consumerGroup = "payment-group")
public class PaymentConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    @Transactional
    public void onMessage(OrderCreatedEvent event) {
        boolean success = paymentService.deductBalance(event.getUserId(), event.getAmount());
        
        if (!success) {
            // 余额不足,发送回滚消息
            mqTemplate.convertAndSend("order.cancel", event.getOrderId());
            throw new RuntimeException("余额不足");
        }
    }
}

// 4. 订单服务:消费回滚消息
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "order-group")
public class OrderCancelConsumer implements RocketMQListener<Long> {
    @Override
    @Transactional
    public void onMessage(Long orderId) {
        // 取消订单
        orderService.cancelOrder(orderId);
    }
}

核心原理

  • 使用 RocketMQ 事务消息 保证消息发送与本地事务的一致性
  • 通过 补偿机制(回滚消息)实现最终一致性
  • 比 2PC/3PC 更高效,适合分布式场景

6. 延迟任务与定时任务

场景描述:订单创建后30分钟未支付自动取消。

使用 RabbitMQ 死信队列 + TTL 实现延迟队列

// 1. 配置延迟队列
@Configuration
public class DelayQueueConfig {
    
    // 业务队列(30分钟后消息进入此队列)
    @Bean
    public Queue orderTimeoutQueue() {
        return new Queue("order.timeout", true);
    }
    
    // 延迟队列(消息先进入此队列)
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "");  // 死信交换机
        args.put("x-dead-letter-routing-key", "order.timeout");  // 死信路由键
        args.put("x-message-ttl", 30 * 60 * 1000);  // 30分钟
        return new Queue("order.delay", true, false, false, args);
    }
}

// 2. 生产者:创建订单时发送延迟消息
@PostMapping("/order")
public Result createOrder(Order order) {
    // 保存订单
    orderService.saveOrder(order);
    
    // 发送延迟消息(30分钟后触发)
    rabbitTemplate.convertAndSend("order.delay", order.getId());
    
    return Result.success();
}

// 3. 消费者:30分钟后检查订单状态
@RabbitListener(queues = "order.timeout")
public void handleOrderTimeout(Long orderId) {
    Order order = orderService.getById(orderId);
    
    // 检查订单是否已支付
    if (order.getStatus() == OrderStatus.UNPAID) {
        // 取消订单
        orderService.cancelOrder(orderId);
        
        // 恢复库存
        inventoryService.restoreStock(order.getProductId(), order.getQuantity());
    }
}

使用 RocketMQ 延迟消息

// RocketMQ 支持18个延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
Message message = MessageBuilder.withPayload(orderId.toString()).build();
rocketMQTemplate.syncSend("order.timeout", message, 3000, 16);  // 延迟级别16 = 30分钟

消息队列选型对比

场景 推荐 MQ 原因
高吞吐量日志收集 Kafka 单机百万级 QPS,适合大数据场景
金融级可靠性 RocketMQ 支持事务消息,消息零丢失
复杂路由规则 RabbitMQ 支持多种交换机类型(Topic、Fanout、Direct)
延迟任务 RabbitMQ / RocketMQ RabbitMQ(死信队列)、RocketMQ(延迟消息)
分布式事务 RocketMQ 原生支持事务消息
实时性要求高 RabbitMQ / RocketMQ Kafka 批量发送,实时性稍差

实战注意事项

1. 消息幂等性

// 使用 Redis 防止重复消费
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    public void onMessage(OrderCreatedEvent event) {
        String msgId = event.getMsgId();
        
        // 检查消息是否已处理(基于 Redis SETNX)
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
            "msg:consumed:" + msgId, "1", 24, TimeUnit.HOURS
        );
        
        if (Boolean.FALSE.equals(success)) {
            // 消息已处理,直接返回
            return;
        }
        
        // 处理业务逻辑
        inventoryService.deductStock(event.getOrder());
    }
}

2. 消息可靠性

// RocketMQ 同步发送(确保消息不丢失)
SendResult sendResult = rocketMQTemplate.syncSend("order.created", event);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    throw new RuntimeException("消息发送失败");
}

// RabbitMQ 发送确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        log.error("消息发送失败:{}", cause);
        // 重试或告警
    }
});

3. 消费失败重试

// 设置最大重试次数,避免死循环
@RocketMQMessageListener(
    topic = "order.created", 
    consumerGroup = "inventory-group",
    maxReconsumeTimes = 3  // 最多重试3次
)
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
    @Override
    public void onMessage(OrderCreatedEvent event) {
        try {
            inventoryService.deductStock(event.getOrder());
        } catch (Exception e) {
            // 重试3次后仍失败,进入死信队列
            log.error("库存扣减失败:{}", event, e);
            throw e;
        }
    }
}

答题总结

消息队列的6大核心使用场景

  1. 异步处理:提升响应速度(如用户注册发送邮件/短信)
  2. 系统解耦:降低系统间依赖(如订单系统与库存、积分、物流解耦)
  3. 流量削峰:应对突发流量(如秒杀场景)
  4. 日志收集:统一收集分布式日志(如 ELK + Kafka)
  5. 分布式事务:实现最终一致性(如订单、库存、支付)
  6. 延迟任务:定时触发业务逻辑(如订单超时自动取消)

选型建议

  • Kafka:大数据、日志收集、高吞吐量场景
  • RocketMQ:金融级可靠性、分布式事务、延迟消息
  • RabbitMQ:复杂路由、延迟队列、实时性要求高

核心技术点

  • 消息幂等性:Redis SETNX 防止重复消费
  • 消息可靠性:同步发送 + 发送确认 + 持久化
  • 消费失败重试:设置最大重试次数 + 死信队列

面试技巧:建议从异步处理系统解耦两个最常见场景切入,再根据面试官提问深入到流量削峰、分布式事务等高级场景,并结合实际项目经验说明选型理由。