问题

Kafka怎么保证消费只消费一次的?

答案

一、核心概念

Kafka 提供了三种消费语义

  1. At Most Once(最多一次):消息可能丢失,但不会重复
  2. At Least Once(至少一次):消息不会丢失,但可能重复
  3. Exactly Once(精确一次):消息既不丢失,也不重复(理想状态

“只消费一次”即 Exactly Once 语义,是 Kafka 最高级别的消费保证。

二、三种消费语义对比

1. At Most Once(最多一次)

实现方式:先提交 Offset,后处理消息

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    consumer.commitSync();  // 1. 先提交 Offset
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // 2. 后处理(失败则丢失)
    }
}

特点

  • 优点:不会重复消费
  • 缺点:消息可能丢失(处理失败时已标记为消费)
  • 适用场景:允许丢失的日志采集、监控指标

2. At Least Once(至少一次)

实现方式:先处理消息,后提交 Offset

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);  // 1. 先处理
    }
    consumer.commitSync();      // 2. 后提交(失败则重复消费)
}

特点

  • 优点:消息不会丢失
  • 缺点:可能重复消费(处理成功但提交失败,或 Consumer 重启)
  • 适用场景:大多数业务场景(配合幂等性设计)

3. Exactly Once(精确一次)

实现方式:幂等性 + 事务 + 业务层去重

特点

  • 优点:消息既不丢失,也不重复
  • 缺点:实现复杂,性能开销大
  • 适用场景:金融支付、订单处理等对准确性要求极高的场景

三、Exactly Once 的实现机制

Kafka 提供了两种级别的 Exactly Once:

  1. Kafka 内部的 Exactly Once(Producer → Broker)
  2. 端到端的 Exactly Once(Producer → Broker → Consumer)

1. Producer 端的幂等性(Idempotence)

作用:避免 Producer 重试导致的消息重复

开启方式

props.put("enable.idempotence", true); // Kafka 3.0+ 默认开启

实现原理

  • Producer 启动时,Broker 分配一个唯一的 PID(Producer ID)
  • 每条消息附带 Sequence Number(序列号)
  • Broker 端检测 <PID, Partition, Sequence> 三元组:
    • 如果重复(重试导致),直接返回成功,不写入
    • 如果乱序(网络延迟),拒绝写入
Producer 发送流程:
  消息1: PID=100, Partition=0, Seq=0 → Broker 写入
  消息2: PID=100, Partition=0, Seq=1 → Broker 写入
  消息2(重试): PID=100, Partition=0, Seq=1 → Broker 去重,返回成功

限制

  • 仅保证单分区、单会话的幂等性
  • Producer 重启后 PID 变化,幂等性失效
  • 无法跨分区、跨会话去重

2. Producer 端的事务(Transaction)

作用:实现跨分区、跨会话的原子性写入

开启方式

// 配置事务 ID
props.put("transactional.id", "my-transactional-id");
props.put("enable.idempotence", true); // 事务自动开启幂等性

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 初始化事务

// 使用事务
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
    producer.commitTransaction(); // 提交事务
} catch (Exception e) {
    producer.abortTransaction(); // 回滚事务
}

实现原理

  • 引入 Transaction Coordinator(事务协调器)
  • 通过 两阶段提交(2PC)保证原子性
  • 事务日志存储在内部 Topic __transaction_state

事务流程

1. Producer 向 Transaction Coordinator 注册事务
2. Producer 发送消息到多个 Partition(标记为事务消息)
3. Producer 提交事务 → Coordinator 写入提交标记
4. Consumer 只消费已提交的事务消息

特点

  • ✅ 跨分区、跨会话的原子性
  • ✅ 配合 isolation.level=read_committed,Consumer 只读取已提交事务
  • ❌ 性能开销大(增加延迟 20%-30%)

3. Consumer 端的 Exactly Once

核心思路:将消费 + 业务处理 + Offset 提交放在同一个事务中

方案1:消费事务(Kafka 事务)

// Consumer 配置
props.put("isolation.level", "read_committed"); // 只读已提交事务

// Producer 配置(用于写入结果)
producerProps.put("transactional.id", "consumer-transaction-id");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    producer.beginTransaction();
    try {
        // 1. 处理消息
        for (ConsumerRecord<String, String> record : records) {
            String result = processRecord(record);
            // 2. 写入结果到另一个 Topic
            producer.send(new ProducerRecord<>("result-topic", result));
        }

        // 3. 提交 Offset 到事务
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            long offset = records.records(partition).get(records.records(partition).size() - 1).offset();
            offsets.put(partition, new OffsetAndMetadata(offset + 1));
        }
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

        // 4. 提交事务
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
    }
}

方案2:业务层幂等性(推荐)

通过唯一键约束状态机实现业务层去重:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        String messageId = record.key(); // 消息唯一 ID

        // 1. 检查是否已处理(查询数据库或 Redis)
        if (isProcessed(messageId)) {
            continue; // 跳过已处理的消息
        }

        // 2. 处理消息
        processRecord(record);

        // 3. 标记为已处理(写入数据库,利用唯一键约束)
        markAsProcessed(messageId);
    }

    // 4. 提交 Offset
    consumer.commitSync();
}

数据库实现

CREATE TABLE processed_messages (
    message_id VARCHAR(255) PRIMARY KEY,  -- 唯一键约束
    process_time TIMESTAMP
);

-- 插入时利用唯一键约束去重
INSERT INTO processed_messages (message_id, process_time)
VALUES ('msg-12345', NOW())
ON DUPLICATE KEY UPDATE message_id = message_id; -- MySQL
-- 或
INSERT INTO processed_messages (message_id, process_time)
VALUES ('msg-12345', NOW())
ON CONFLICT (message_id) DO NOTHING; -- PostgreSQL

四、Exactly Once 的实现挑战

1. Consumer Rebalance 导致重复

Consumer 1 正在处理消息,未提交 Offset
  ↓
Consumer 2 加入,触发 Rebalance
  ↓
Partition 重新分配给 Consumer 2
  ↓
Consumer 2 从上次提交的 Offset 开始消费(重复)

解决方案

  • 在 Rebalance 监听器中提交 Offset
  • 使用 Sticky 分区分配策略(减少 Rebalance 频率)
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(); // Rebalance 前提交
    }
});

2. 网络分区导致重复

Consumer 处理完成,提交 Offset 时网络超时
  ↓
Consumer 重启后,从旧 Offset 开始消费(重复)

解决方案

  • 业务层实现幂等性(推荐)
  • 使用分布式锁(如 Redis)控制并发

3. 事务性能开销

Kafka 事务会引入额外开销:

  • 延迟增加:20%-30%
  • 吞吐量下降:10%-20%

权衡建议

  • 金融场景:使用 Kafka 事务(牺牲性能换准确性)
  • 一般场景:At Least Once + 业务层幂等(性能优先)

五、最佳实践总结

1. 选择合适的消费语义

场景 推荐语义 实现方式
日志采集 At Most Once 先提交后处理
一般业务 At Least Once 先处理后提交 + 业务幂等
金融支付 Exactly Once Kafka 事务 + 业务幂等

2. 实现 Exactly Once 的组合方案

// Producer 配置
props.put("enable.idempotence", true);        // 开启幂等性
props.put("acks", "all");                     // 等待所有 ISR
props.put("retries", Integer.MAX_VALUE);      // 无限重试

// Consumer 配置
props.put("enable.auto.commit", false);       // 手动提交
props.put("isolation.level", "read_committed"); // 只读已提交事务

// 业务层幂等
- 数据库唯一键约束
- Redis 去重SET NX
- 业务状态机已支付已发货等

六、总结

Kafka 如何保证只消费一次?

  1. Producer 端幂等性enable.idempotence=true,避免重试导致重复
  2. Producer 端事务transactional.id + 事务 API,实现跨分区原子性
  3. Consumer 端事务sendOffsetsToTransaction,将 Offset 提交放入事务
  4. 业务层幂等性:唯一键约束、Redis 去重、状态机(最推荐

面试答题要点

  • 三种语义:At Most Once、At Least Once、Exactly Once
  • 幂等性:PID + Sequence Number,单分区单会话去重
  • 事务:Transaction Coordinator + 两阶段提交
  • 业务幂等:数据库唯一键、Redis SET NX、状态机
  • 权衡:Exactly Once 性能开销大,一般场景推荐 At Least Once + 业务幂等