核心概念
消息队列(Message Queue,MQ) 是一种异步通信中间件,通过在应用程序之间传递消息来实现系统解耦。其核心思想是:生产者将消息发送到队列,消费者从队列中获取消息进行处理,生产者和消费者无需直接交互。
核心价值:
- 解耦:降低系统间的依赖关系
- 异步:提升系统响应速度
- 削峰填谷:平滑流量波动
- 可靠性:保证消息不丢失
六大核心使用场景
1. 异步处理(提升响应速度)
场景描述:用户注册后需要发送邮件、短信通知,如果同步执行会导致响应时间过长。
传统同步方式:
// 用户注册(总耗时 = 注册 + 邮件 + 短信 = 50ms + 100ms + 100ms = 250ms)
@PostMapping("/register")
public Result register(User user) {
// 1. 保存用户信息 (50ms)
userService.saveUser(user);
// 2. 发送注册邮件 (100ms)
emailService.sendEmail(user.getEmail());
// 3. 发送短信通知 (100ms)
smsService.sendSms(user.getPhone());
return Result.success("注册成功");
}
使用消息队列优化:
// 用户注册(总耗时 = 注册 + 发送消息 = 50ms + 5ms = 55ms)
@PostMapping("/register")
public Result register(User user) {
// 1. 保存用户信息 (50ms)
userService.saveUser(user);
// 2. 发送消息到队列 (5ms)
mqProducer.send("user.register", new RegisterEvent(user));
return Result.success("注册成功");
}
// 消费者异步处理
@RabbitListener(queues = "user.register")
public void handleRegister(RegisterEvent event) {
// 异步发送邮件
emailService.sendEmail(event.getEmail());
// 异步发送短信
smsService.sendSms(event.getPhone());
}
性能提升:响应时间从 250ms 降低到 55ms,用户体验显著提升。
2. 系统解耦(降低依赖关系)
场景描述:订单系统需要通知多个下游系统(库存、积分、物流),如果直接调用会导致强耦合。
传统强耦合方式:
// 订单服务直接依赖多个系统
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService; // 依赖库存系统
@Autowired
private PointsService pointsService; // 依赖积分系统
@Autowired
private LogisticsService logisticsService; // 依赖物流系统
public void createOrder(Order order) {
// 保存订单
orderMapper.insert(order);
// 通知多个系统(强耦合)
inventoryService.deductStock(order);
pointsService.addPoints(order);
logisticsService.createShipment(order);
}
}
问题:
- 如果新增一个下游系统(如营销系统),需要修改订单服务代码
- 如果某个下游系统不可用,订单创建会失败
使用消息队列解耦:
// 订单服务只需发送事件,无需关心下游系统
@Service
public class OrderService {
@Autowired
private RocketMQTemplate mqTemplate;
public void createOrder(Order order) {
// 保存订单
orderMapper.insert(order);
// 发布订单创建事件
mqTemplate.convertAndSend("order.created", new OrderCreatedEvent(order));
}
}
// 库存服务消费消息
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
inventoryService.deductStock(event.getOrder());
}
}
// 积分服务消费消息
@RocketMQMessageListener(topic = "order.created", consumerGroup = "points-group")
public class PointsConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
pointsService.addPoints(event.getOrder());
}
}
// 物流服务消费消息(新增系统无需修改订单服务)
@RocketMQMessageListener(topic = "order.created", consumerGroup = "logistics-group")
public class LogisticsConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
logisticsService.createShipment(event.getOrder());
}
}
优势:
- 订单服务与下游系统完全解耦
- 新增/删除下游系统无需修改订单服务
- 某个下游系统故障不影响订单创建
3. 流量削峰填谷(应对突发流量)
场景描述:秒杀活动瞬间涌入10万并发请求,数据库无法承受。
传统方式:
// 直接写数据库,瞬间压垮数据库
@PostMapping("/seckill")
public Result seckill(Long productId, Long userId) {
// 直接操作数据库(10万并发 → 数据库崩溃)
boolean success = seckillService.processSeckill(productId, userId);
return success ? Result.success() : Result.fail("秒杀失败");
}
使用消息队列削峰:
// 将请求写入消息队列,慢慢消费
@PostMapping("/seckill")
public Result seckill(Long productId, Long userId) {
// 1. 校验库存(基于 Redis)
if (!redisTemplate.opsForValue().decrement("stock:" + productId) >= 0) {
return Result.fail("库存不足");
}
// 2. 发送消息到队列(10万 QPS → 队列缓冲)
mqProducer.send("seckill.order", new SeckillOrder(productId, userId));
return Result.success("排队中,请稍后查看结果");
}
// 消费者按固定速率消费(如 1000 TPS)
@RabbitListener(queues = "seckill.order", concurrency = "10")
public void processSeckill(SeckillOrder order) {
// 从队列慢慢消费,保护数据库
seckillService.createOrder(order);
}
效果对比:
| 方式 | 瞬时并发 | 数据库压力 | 结果 |
|---|---|---|---|
| 传统方式 | 10万 QPS | 10万 QPS | 数据库崩溃 |
| 消息队列 | 10万 QPS | 1000 TPS(可控) | 系统稳定 |
4. 日志收集与数据聚合
场景描述:微服务架构下,多个服务的日志需要统一收集到 ELK(Elasticsearch + Logstash + Kibana)。
架构设计:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 订单服务 │ │ 用户服务 │ │ 支付服务 │
│ (写入日志) │ │ (写入日志) │ │ (写入日志) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└───────────────────┴───────────────────┘
│
↓
┌──────────────────────┐
│ Kafka (消息队列) │
│ (日志缓冲) │
└──────────┬───────────┘
│
↓
┌──────────────────────┐
│ Logstash (消费者) │
│ (日志处理) │
└──────────┬───────────┘
│
↓
┌──────────────────────┐
│ Elasticsearch │
│ (日志存储与检索) │
└──────────────────────┘
生产者(业务服务):
// 使用 Logback 异步写入 Kafka
<!-- logback-spring.xml -->
<appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n</pattern>
</encoder>
<topic>app-logs</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</appender>
消费者(Logstash):
# logstash.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-logs"]
codec => "json"
}
}
filter {
# 解析日志
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
5. 分布式事务(最终一致性)
场景描述:电商下单需要扣减库存、扣款、创建订单,涉及多个系统。
基于消息队列的最终一致性方案:
// 1. 订单服务:创建订单并发送消息
@Transactional
public void createOrder(Order order) {
// 保存订单(本地事务)
orderMapper.insert(order);
// 发送消息(事务消息,确保与本地事务一致性)
rocketMQTemplate.sendMessageInTransaction(
"order.created",
new OrderCreatedEvent(order),
null
);
}
// 2. 库存服务:消费消息扣减库存
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
@Transactional
public void onMessage(OrderCreatedEvent event) {
// 扣减库存
boolean success = inventoryService.deductStock(event.getOrder());
if (!success) {
// 库存不足,发送回滚消息
mqTemplate.convertAndSend("order.cancel", event.getOrderId());
throw new RuntimeException("库存不足");
}
}
}
// 3. 支付服务:消费消息扣款
@RocketMQMessageListener(topic = "order.created", consumerGroup = "payment-group")
public class PaymentConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
@Transactional
public void onMessage(OrderCreatedEvent event) {
boolean success = paymentService.deductBalance(event.getUserId(), event.getAmount());
if (!success) {
// 余额不足,发送回滚消息
mqTemplate.convertAndSend("order.cancel", event.getOrderId());
throw new RuntimeException("余额不足");
}
}
}
// 4. 订单服务:消费回滚消息
@RocketMQMessageListener(topic = "order.cancel", consumerGroup = "order-group")
public class OrderCancelConsumer implements RocketMQListener<Long> {
@Override
@Transactional
public void onMessage(Long orderId) {
// 取消订单
orderService.cancelOrder(orderId);
}
}
核心原理:
- 使用 RocketMQ 事务消息 保证消息发送与本地事务的一致性
- 通过 补偿机制(回滚消息)实现最终一致性
- 比 2PC/3PC 更高效,适合分布式场景
6. 延迟任务与定时任务
场景描述:订单创建后30分钟未支付自动取消。
使用 RabbitMQ 死信队列 + TTL 实现延迟队列:
// 1. 配置延迟队列
@Configuration
public class DelayQueueConfig {
// 业务队列(30分钟后消息进入此队列)
@Bean
public Queue orderTimeoutQueue() {
return new Queue("order.timeout", true);
}
// 延迟队列(消息先进入此队列)
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", ""); // 死信交换机
args.put("x-dead-letter-routing-key", "order.timeout"); // 死信路由键
args.put("x-message-ttl", 30 * 60 * 1000); // 30分钟
return new Queue("order.delay", true, false, false, args);
}
}
// 2. 生产者:创建订单时发送延迟消息
@PostMapping("/order")
public Result createOrder(Order order) {
// 保存订单
orderService.saveOrder(order);
// 发送延迟消息(30分钟后触发)
rabbitTemplate.convertAndSend("order.delay", order.getId());
return Result.success();
}
// 3. 消费者:30分钟后检查订单状态
@RabbitListener(queues = "order.timeout")
public void handleOrderTimeout(Long orderId) {
Order order = orderService.getById(orderId);
// 检查订单是否已支付
if (order.getStatus() == OrderStatus.UNPAID) {
// 取消订单
orderService.cancelOrder(orderId);
// 恢复库存
inventoryService.restoreStock(order.getProductId(), order.getQuantity());
}
}
使用 RocketMQ 延迟消息:
// RocketMQ 支持18个延迟级别(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
Message message = MessageBuilder.withPayload(orderId.toString()).build();
rocketMQTemplate.syncSend("order.timeout", message, 3000, 16); // 延迟级别16 = 30分钟
消息队列选型对比
| 场景 | 推荐 MQ | 原因 |
|---|---|---|
| 高吞吐量日志收集 | Kafka | 单机百万级 QPS,适合大数据场景 |
| 金融级可靠性 | RocketMQ | 支持事务消息,消息零丢失 |
| 复杂路由规则 | RabbitMQ | 支持多种交换机类型(Topic、Fanout、Direct) |
| 延迟任务 | RabbitMQ / RocketMQ | RabbitMQ(死信队列)、RocketMQ(延迟消息) |
| 分布式事务 | RocketMQ | 原生支持事务消息 |
| 实时性要求高 | RabbitMQ / RocketMQ | Kafka 批量发送,实时性稍差 |
实战注意事项
1. 消息幂等性
// 使用 Redis 防止重复消费
@RocketMQMessageListener(topic = "order.created", consumerGroup = "inventory-group")
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
String msgId = event.getMsgId();
// 检查消息是否已处理(基于 Redis SETNX)
Boolean success = redisTemplate.opsForValue().setIfAbsent(
"msg:consumed:" + msgId, "1", 24, TimeUnit.HOURS
);
if (Boolean.FALSE.equals(success)) {
// 消息已处理,直接返回
return;
}
// 处理业务逻辑
inventoryService.deductStock(event.getOrder());
}
}
2. 消息可靠性
// RocketMQ 同步发送(确保消息不丢失)
SendResult sendResult = rocketMQTemplate.syncSend("order.created", event);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("消息发送失败");
}
// RabbitMQ 发送确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败:{}", cause);
// 重试或告警
}
});
3. 消费失败重试
// 设置最大重试次数,避免死循环
@RocketMQMessageListener(
topic = "order.created",
consumerGroup = "inventory-group",
maxReconsumeTimes = 3 // 最多重试3次
)
public class InventoryConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
try {
inventoryService.deductStock(event.getOrder());
} catch (Exception e) {
// 重试3次后仍失败,进入死信队列
log.error("库存扣减失败:{}", event, e);
throw e;
}
}
}
答题总结
消息队列的6大核心使用场景:
- 异步处理:提升响应速度(如用户注册发送邮件/短信)
- 系统解耦:降低系统间依赖(如订单系统与库存、积分、物流解耦)
- 流量削峰:应对突发流量(如秒杀场景)
- 日志收集:统一收集分布式日志(如 ELK + Kafka)
- 分布式事务:实现最终一致性(如订单、库存、支付)
- 延迟任务:定时触发业务逻辑(如订单超时自动取消)
选型建议:
- Kafka:大数据、日志收集、高吞吐量场景
- RocketMQ:金融级可靠性、分布式事务、延迟消息
- RabbitMQ:复杂路由、延迟队列、实时性要求高
核心技术点:
- 消息幂等性:Redis SETNX 防止重复消费
- 消息可靠性:同步发送 + 发送确认 + 持久化
- 消费失败重试:设置最大重试次数 + 死信队列
面试技巧:建议从异步处理和系统解耦两个最常见场景切入,再根据面试官提问深入到流量削峰、分布式事务等高级场景,并结合实际项目经验说明选型理由。