问题

用 Stream 做消息队列行不行?

答案

1. 核心结论

可以,但要看场景。Redis Stream 是 Redis 5.0 引入的专门用于消息队列的数据结构,相比 List 和 Pub/Sub 有显著优势,适合轻量级、低延迟、中小规模的消息队列场景。但对于高可靠性、大规模、复杂路由的需求,仍建议使用专业 MQ(如 RabbitMQ、Kafka、RocketMQ)。

2. Redis Stream 的核心特性

基本概念

Stream 是一个只追加的日志结构,每条消息有唯一 ID(时间戳 + 序列号):

# 生产消息
XADD mystream * field1 value1 field2 value2
# 返回:1699999999999-0(时间戳-序列号)

# 消费消息(从头开始)
XREAD COUNT 10 STREAMS mystream 0

# 消费组模式
XGROUP CREATE mystream mygroup 0
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >

核心功能

功能 说明 命令
消息持久化 消息存储在内存(可持久化到 RDB/AOF) XADD
消费组 支持多消费者负载均衡 XGROUP CREATE
消息确认 ACK 机制,防止消息丢失 XACK
Pending 列表 跟踪未确认的消息 XPENDING
消息重试 重新分配超时未确认的消息 XCLAIM
消息回溯 可从任意位置读取历史消息 XREAD
消息裁剪 限制 Stream 长度,防止内存溢出 XTRIM

3. 与其他 Redis 数据结构的对比

List(LPUSH + BRPOP)

# 生产
LPUSH queue "message1"

# 消费
BRPOP queue 0

缺点

  • ❌ 无消费组,无法多消费者负载均衡
  • ❌ 消息消费后立即删除,无法重复消费
  • ❌ 无 ACK 机制,消费者崩溃会丢消息

Pub/Sub

# 发布
PUBLISH channel "message"

# 订阅
SUBSCRIBE channel

缺点

  • ❌ 消息不持久化,订阅者离线会丢失消息
  • ❌ 无消费组,所有订阅者都收到相同消息
  • ❌ 无 ACK 机制,无法保证消息被处理

Stream 的优势

✅ 消息持久化(可配置 RDB/AOF) ✅ 支持消费组(多消费者负载均衡) ✅ ACK 机制 + Pending 列表(防止消息丢失) ✅ 消息回溯(可重复消费历史消息) ✅ 消息 ID 自动生成(有序、唯一)

4. 与专业 MQ 的对比

对比表格

特性 Redis Stream RabbitMQ Kafka RocketMQ
吞吐量 中(10万/s) 中(万级/s) 高(百万/s) 高(十万/s)
延迟 极低(ms) 低(ms) 中(ms-s) 低(ms)
持久化 内存 + RDB/AOF 磁盘 磁盘(顺序写) 磁盘
消息顺序 全局有序 队列有序 分区有序 队列有序
消费模式 消费组 队列/主题 消费组 消费组
消息回溯
事务消息
延迟消息 ❌(需自己实现)
死信队列 ❌(需自己实现)
消息过滤
运维复杂度
集群支持 Redis Cluster 原生集群 原生分布式 原生集群

Redis Stream 的优势

  1. 极低延迟:内存操作,毫秒级响应
  2. 简单易用:无需额外部署,复用 Redis 基础设施
  3. 轻量级:适合中小规模场景
  4. 多功能:Redis 本身还提供缓存、分布式锁等功能

Redis Stream 的劣势

  1. 内存限制:消息存储在内存,成本高,不适合大量积压
  2. 持久化性能:RDB/AOF 持久化会影响性能
  3. 功能缺失:无事务消息、延迟消息、死信队列、消息过滤
  4. 集群复杂度:Redis Cluster 的 Slot 迁移可能影响消息顺序
  5. 无消息路由:不支持复杂的主题订阅和路由规则

5. 适用场景

✅ 适合使用 Redis Stream

  1. 实时日志收集:应用日志、用户行为日志
  2. 轻量级任务队列:异步任务处理(如发送邮件、推送通知)
  3. 实时数据流:监控指标、传感器数据
  4. 消息量不大:每天百万级以下
  5. 低延迟要求:毫秒级响应
  6. 已有 Redis 基础设施:避免引入新组件

❌ 不适合使用 Redis Stream

  1. 海量消息:每天亿级以上(内存成本高)
  2. 长时间积压:消息需要保留数天或数周
  3. 强一致性:金融交易、订单系统(需要事务消息)
  4. 复杂路由:需要主题订阅、消息过滤
  5. 延迟消息:需要定时发送消息
  6. 死信处理:需要自动重试和死信队列

6. 最佳实践

生产者

// 使用 Jedis 客户端
Jedis jedis = new Jedis("localhost", 6379);

// 发送消息
Map<String, String> message = new HashMap<>();
message.put("orderId", "12345");
message.put("amount", "100.00");

StreamEntryID id = jedis.xadd(
    "order-stream",
    StreamEntryID.NEW_ENTRY,  // 自动生成 ID
    message,
    1000000,  // 最大长度(MAXLEN)
    true      // 近似裁剪(~)
);

System.out.println("Message ID: " + id);

消费者(消费组模式)

// 1. 创建消费组(只需执行一次)
try {
    jedis.xgroupCreate("order-stream", "order-group", StreamEntryID.LAST_ENTRY, false);
} catch (JedisDataException e) {
    // 消费组已存在
}

// 2. 消费消息
while (true) {
    // 读取新消息
    List<Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(
        "order-group",
        "consumer-1",
        1,  // COUNT
        0,  // BLOCK(0 表示阻塞)
        false,
        Map.entry("order-stream", StreamEntryID.UNRECEIVED_ENTRY)
    );

    for (Entry<String, List<StreamEntry>> entry : entries) {
        for (StreamEntry e : entry.getValue()) {
            try {
                // 处理消息
                processMessage(e.getFields());

                // 确认消息
                jedis.xack("order-stream", "order-group", e.getID());
            } catch (Exception ex) {
                // 处理失败,消息会保留在 Pending 列表
                ex.printStackTrace();
            }
        }
    }
}

处理 Pending 消息(重试机制)

// 定期检查 Pending 列表
List<StreamPendingEntry> pending = jedis.xpending(
    "order-stream",
    "order-group",
    StreamEntryID.MINIMUM_ID,
    StreamEntryID.MAXIMUM_ID,
    10,
    "consumer-1"
);

for (StreamPendingEntry entry : pending) {
    // 超过 5 分钟未确认,重新分配
    if (entry.getIdleTime() > 300000) {
        List<StreamEntry> claimed = jedis.xclaim(
            "order-stream",
            "order-group",
            "consumer-2",  // 分配给其他消费者
            300000,
            0,
            0,
            false,
            entry.getID()
        );

        // 重新处理
        for (StreamEntry e : claimed) {
            processMessage(e.getFields());
            jedis.xack("order-stream", "order-group", e.getID());
        }
    }
}

消息裁剪(防止内存溢出)

# 方式1:保留最近 100 万条消息
XTRIM mystream MAXLEN ~ 1000000

# 方式2:保留最近 7 天的消息(需要定时任务)
XTRIM mystream MINID <7天前的时间戳>

7. 性能优化

批量操作

// 批量发送
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 1000; i++) {
    pipeline.xadd("mystream", StreamEntryID.NEW_ENTRY, Map.of("data", "value" + i));
}
pipeline.sync();

// 批量消费
jedis.xreadGroup("mygroup", "consumer1", 100, 0, false, ...);

持久化配置

# 方式1:RDB(定期快照,可能丢失部分数据)
save 900 1
save 300 10
save 60 10000

# 方式2:AOF(每秒同步,最多丢失 1 秒数据)
appendonly yes
appendfsync everysec

# 方式3:AOF + RDB 混合持久化(推荐)
aof-use-rdb-preamble yes

集群部署

# Redis Cluster 模式
# 注意:Stream 的所有消息必须在同一个 Slot,使用 Hash Tag
XADD {order}:stream * field value

8. 监控指标

# 查看 Stream 信息
XINFO STREAM mystream

# 查看消费组信息
XINFO GROUPS mystream

# 查看消费者信息
XINFO CONSUMERS mystream mygroup

# 查看 Pending 消息数量
XPENDING mystream mygroup

9. 面试总结

回答要点

  1. 可以用,但要看场景:适合轻量级、低延迟、中小规模的消息队列
  2. 核心优势:消息持久化、消费组、ACK 机制、消息回溯、极低延迟
  3. 主要劣势:内存限制、功能缺失(无事务/延迟/死信)、不适合海量消息
  4. 适用场景:实时日志、轻量级任务队列、实时数据流
  5. 不适用场景:海量消息、长时间积压、强一致性、复杂路由
  6. 最佳实践:消费组模式 + ACK + Pending 重试 + 消息裁剪

进阶问题

  • Redis Stream 如何保证消息不丢失?(AOF 持久化 + ACK 机制 + Pending 列表)
  • 消费者崩溃后如何恢复?(通过 XPENDING 和 XCLAIM 重新分配消息)
  • 如何实现延迟消息?(使用 Sorted Set + 定时任务扫描)
  • Redis Cluster 模式下如何保证消息顺序?(使用 Hash Tag 确保消息在同一 Slot)
  • 什么时候应该从 Redis Stream 迁移到专业 MQ?(消息量超过内存容量、需要事务消息、需要复杂路由)