一、核心概念
延迟消息(Delayed Message)是指消息发送到 Broker 后,不会立即被消费者消费,而是在指定的延迟时间后才能被消费的一种消息类型。
典型应用场景:
- 订单超时自动取消(30分钟未支付)
- 定时任务触发(每天凌晨数据统计)
- 延迟通知(用户注册7天后发送提醒)
- 限时优惠到期处理
二、延迟消息的使用
1. 基本使用方式
// 创建延迟消息
Message msg = new Message("TopicA", "TagA", "延迟消息内容".getBytes());
// 设置延迟级别(1-18,对应不同延迟时间)
// 3 表示延迟 10 秒
msg.setDelayTimeLevel(3);
// 发送消息
SendResult result = producer.send(msg);
2. 延迟级别对照表
RocketMQ 不支持任意时间的延迟,只支持预设的18个延迟级别:
// MessageStoreConfig.java 中的定义
private String messageDelayLevel =
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
| 延迟级别 | 延迟时间 | 延迟级别 | 延迟时间 |
|---|---|---|---|
| 1 | 1秒 | 10 | 6分钟 |
| 2 | 5秒 | 11 | 7分钟 |
| 3 | 10秒 | 12 | 8分钟 |
| 4 | 30秒 | 13 | 9分钟 |
| 5 | 1分钟 | 14 | 10分钟 |
| 6 | 2分钟 | 15 | 20分钟 |
| 7 | 3分钟 | 16 | 30分钟 |
| 8 | 4分钟 | 17 | 1小时 |
| 9 | 5分钟 | 18 | 2小时 |
3. 实际应用示例
/**
* 订单超时取消场景
*/
public void createOrder(Order order) {
// 1. 创建订单
orderService.save(order);
// 2. 发送延迟消息(30分钟后检查订单状态)
Message msg = new Message(
"ORDER_TIMEOUT_TOPIC",
"CANCEL",
order.getId().toString().getBytes()
);
// 设置延迟级别16(30分钟)
msg.setDelayTimeLevel(16);
producer.send(msg);
}
/**
* 消费者:检查并取消超时订单
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
// 查询订单状态
Order order = orderService.getById(orderId);
// 如果订单仍未支付,执行取消逻辑
if (order.getStatus() == OrderStatus.UNPAID) {
orderService.cancelOrder(orderId);
log.info("订单{}超时未支付,已自动取消", orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
三、延迟消息的实现原理
1. 核心设计思路
RocketMQ 延迟消息的实现不是通过定时器,而是通过特殊的Topic + 后台调度实现:
普通消息流程:
Producer → Topic-QueueId → Consumer
延迟消息流程:
Producer → SCHEDULE_TOPIC_XXXX(替换Topic)
→ 定时调度服务
→ 原始Topic-QueueId → Consumer
2. 实现步骤详解
第一步:消息投递时的 Topic 替换
// ScheduleMessageService.java
// 1. 接收到延迟消息,获取延迟级别
int delayLevel = msg.getDelayTimeLevel();
// 2. 替换 Topic 为内部延迟主题
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // "SCHEDULE_TOPIC_XXXX"
// 3. 计算目标 QueueId(延迟级别 - 1)
int queueId = delayLevel - 1; // delayLevel=3 → queueId=2
// 4. 保存原始 Topic 和 QueueId 到消息属性
msg.putProperty(MessageConst.PROPERTY_REAL_TOPIC, originalTopic);
msg.putProperty(MessageConst.PROPERTY_REAL_QUEUE_ID, originalQueueId);
// 5. 计算投递时间(当前时间 + 延迟时间)
long deliverTime = System.currentTimeMillis() + delayTime;
msg.putProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS, deliverTime);
// 6. 写入 SCHEDULE_TOPIC_XXXX 的对应队列
commitLog.putMessage(msg);
为什么要替换 Topic?
- 避免延迟消息被消费者立即消费
- 所有延迟消息统一管理在
SCHEDULE_TOPIC_XXXX中 - 每个延迟级别对应一个 ConsumeQueue(queueId = delayLevel - 1)
第二步:后台调度服务
// ScheduleMessageService.java
public class ScheduleMessageService {
// 每个延迟级别对应一个定时任务
private final ConcurrentMap<Integer, Timer> deliverExecutorService;
public void start() {
// 为每个延迟级别创建一个定时任务
for (int i = 1; i <= 18; i++) {
long delayTime = delayLevelTable.get(i); // 如 level 3 = 10s
Timer timer = new Timer("ScheduleMessageTimerThread_" + i);
// 定时任务:每隔 delayTime/2 检查一次
timer.schedule(new DeliverDelayedMessageTimerTask(i),
delayTime / 2,
delayTime / 2);
}
}
}
class DeliverDelayedMessageTimerTask extends TimerTask {
private int delayLevel;
@Override
public void run() {
// 1. 从 SCHEDULE_TOPIC_XXXX 的 queueId=(delayLevel-1) 中读取消息
ConsumeQueue cq = findConsumeQueue("SCHEDULE_TOPIC_XXXX", delayLevel - 1);
// 2. 遍历 ConsumeQueue,检查消息的投递时间
for (CqUnit unit : cq) {
long deliverTime = parseDeliverTime(unit);
long now = System.currentTimeMillis();
// 3. 如果到达投递时间,恢复原始 Topic
if (now >= deliverTime) {
MessageExt msg = getMessageByOffset(unit.getPhysicalOffset());
// 恢复原始 Topic 和 QueueId
String realTopic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
String realQueueId = msg.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
msg.setTopic(realTopic);
msg.setQueueId(Integer.parseInt(realQueueId));
// 4. 重新写入 CommitLog(以原始 Topic 的身份)
commitLog.putMessage(msg);
} else {
break; // 未到投递时间,后续消息也不会到
}
}
}
}
3. 数据存储结构
CommitLog 中的延迟消息:
+------------------+
| Topic: SCHEDULE_TOPIC_XXXX
| QueueId: 2 (delayLevel=3)
| Properties:
| - REAL_TOPIC: OrderTopic
| - REAL_QUEUE_ID: 5
| - TIMER_DELIVER_MS: 1699000000000
+------------------+
到期后重新写入 CommitLog:
+------------------+
| Topic: OrderTopic ← 恢复原始Topic
| QueueId: 5 ← 恢复原始QueueId
| 消息内容不变
+------------------+
4. 关键实现细节
1) 为什么每个延迟级别一个 Timer?
- 不同延迟级别的消息到期时间差异大
- 独立调度避免长延迟任务阻塞短延迟任务
- 每个 Timer 只扫描自己对应的 ConsumeQueue
2) 为什么延迟消息要重新写入 CommitLog?
- 保持消息的顺序性和一致性
- 原始 Topic 的 Consumer 只能看到正常的消息
- 便于消息持久化和主从同步
3) 调度精度如何保证?
// 定时任务的执行间隔 = 延迟时间 / 2
// 例如:10秒延迟级别,每5秒扫描一次
// 扫描时的时间判断
if (System.currentTimeMillis() >= deliverTime) {
deliverMessage(); // 立即投递
}
四、延迟消息的限制与优化
1. 主要限制
① 不支持任意时间延迟
// ❌ 不支持:延迟 25 分钟
msg.setDelayTime(25, TimeUnit.MINUTES); // 无此API
// ✅ 只能用预设级别(20分钟或30分钟)
msg.setDelayTimeLevel(15); // 20分钟
② 延迟时间不可修改
- 消息一旦发送,延迟时间无法更改
- 需要取消延迟消息,只能消费后不处理
③ 最大延迟时间 2 小时
- 默认最大延迟级别 18 = 2小时
- 可通过修改配置扩展,但不建议设置过长
2. 自定义延迟级别
# broker.conf
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h
# 重启 Broker 后生效
3. 长延迟时间的替代方案
如果需要超长延迟(如7天后提醒),建议:
方案1:数据库 + 定时任务
// 将延迟任务存入数据库
@Scheduled(cron = "0 */5 * * * ?") // 每5分钟扫描
public void scanDelayedTasks() {
List<Task> tasks = taskMapper.selectDueTasks(new Date());
for (Task task : tasks) {
sendMessage(task); // 发送即时消息
taskMapper.updateStatus(task.getId(), "SENT");
}
}
方案2:多级延迟
// 第一次:延迟2小时
msg.setDelayTimeLevel(18);
// 消费者收到后,判断是否需要继续延迟
if (remainingTime > 0) {
Message newMsg = new Message(topic, body);
newMsg.setDelayTimeLevel(calculateLevel(remainingTime));
producer.send(newMsg); // 递归延迟
} else {
processTask(); // 真正执行任务
}
方案3:RocketMQ 5.0 定时消息(推荐)
// RocketMQ 5.0+ 支持任意时间延迟
Message msg = new Message("TopicA", "Body".getBytes());
// 指定精确的投递时间戳
long deliverTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(7);
msg.setDeliverTimeMs(deliverTime);
producer.send(msg);
五、性能与监控
1. 性能考量
- 延迟消息不影响普通消息:独立的 Topic 和调度线程
- 大量延迟消息:注意磁盘占用(延迟期间消息一直在 CommitLog)
- 调度开销:18个定时线程,CPU 开销较小
2. 监控指标
# 查看延迟消息堆积情况
sh mqadmin topicstatus -n localhost:9876 -t SCHEDULE_TOPIC_XXXX
# 查看各延迟级别的消息数量
sh mqadmin consumerProgress -n localhost:9876 -g SCHEDULE_GROUP
六、面试总结
核心原理
- Topic 替换:延迟消息先发到
SCHEDULE_TOPIC_XXXX - 按级别分队列:18个延迟级别对应18个队列
- 定时扫描:每个级别一个 Timer,定时检查到期消息
- 重新投递:到期后恢复原始 Topic,重新写入 CommitLog
关键代码流程
// 1. 发送时:替换 Topic,保存原始信息
msg.setTopic("SCHEDULE_TOPIC_XXXX");
msg.setQueueId(delayLevel - 1);
msg.setProperty("REAL_TOPIC", originalTopic);
// 2. 定时任务:检查到期时间
if (now >= deliverTime) {
// 恢复原始 Topic,重新投递
msg.setTopic(realTopic);
commitLog.putMessage(msg);
}
答题要点
- 不支持任意延迟:只有18个固定级别
- 实现机制:临时 Topic + 定时扫描 + 重新投递
- 性能隔离:不影响普通消息的吞吐量
- 应用场景:订单超时、定时任务、延迟通知
- 长延迟方案:数据库 + 定时任务,或 RocketMQ 5.0
延伸问题:
- RocketMQ 延迟消息和 RabbitMQ 的 TTL + 死信队列有什么区别?
- 如果要实现任意时间的延迟消息,该如何设计?
- RocketMQ 5.0 的定时消息是如何优化的?