问题
Kafka如何保证消息不丢失?
答案
一、核心概念
Kafka 消息可靠性需要从生产者、Broker、消费者三个环节综合保障。任何一个环节出现问题都可能导致消息丢失。
二、消息丢失的三种场景
Producer → Broker → Consumer
↓ ↓ ↓
丢失点1 丢失点2 丢失点3
- Producer 端丢失:消息未成功发送到 Broker
- Broker 端丢失:消息写入 Broker 后丢失(磁盘故障、未同步副本)
- Consumer 端丢失:消息被标记为已消费,但业务处理失败
三、Producer 端保证消息不丢失
1. 设置 acks 参数(核心)
props.put("acks", "all"); // 或 acks=-1
acks 参数说明:
- acks=0:Producer 不等待 Broker 确认(最快,但可能丢失)
- acks=1:等待 Leader 副本确认写入(默认,Leader 宕机前未同步会丢失)
- acks=all(-1):等待所有 ISR 副本确认写入(最可靠)
acks=all 工作流程:
Producer → Leader Broker(写入成功)
↓
Follower 1(同步成功)
↓
Follower 2(同步成功)
↓
返回确认给 Producer
2. 配置重试机制
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("retry.backoff.ms", 100); // 重试间隔
props.put("request.timeout.ms", 30000); // 请求超时时间
props.put("delivery.timeout.ms", 120000); // 总超时时间
可重试的错误:
NOT_LEADER_FOR_PARTITION:Leader 切换NETWORK_EXCEPTION:网络异常NOT_ENOUGH_REPLICAS:副本数不足
不可重试的错误:
INVALID_TOPIC_EXCEPTION:Topic 不存在RECORD_TOO_LARGE:消息过大
3. 开启幂等性(避免重复)
props.put("enable.idempotence", true); // 开启幂等性
幂等性保证:
- 避免因重试导致消息重复
- Producer 自动分配 PID(Producer ID)+ Sequence Number
- Broker 端去重,保证单分区、单会话的 Exactly Once
4. 同步发送 + 异常处理
// 方式1:同步发送(可靠但性能低)
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("发送成功:" + metadata.offset());
} catch (Exception e) {
// 发送失败,执行补偿逻辑(如写入死信队列)
handleSendFailure(record, e);
}
// 方式2:异步发送 + 回调(推荐)
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 发送失败处理
handleSendFailure(record, exception);
}
});
四、Broker 端保证消息不丢失
1. 副本机制(Replication)
# Topic 创建时指定副本数
kafka-topics.sh --create --topic my-topic \
--replication-factor 3 \
--partitions 3
副本角色:
- Leader:处理读写请求
- Follower:从 Leader 同步数据,作为备份
- ISR(In-Sync Replicas):与 Leader 保持同步的副本集合
Partition 0 的副本分布:
Leader → Broker 1 (处理读写)
Follower → Broker 2 (同步数据)
Follower → Broker 3 (同步数据)
2. 配置 min.insync.replicas(关键)
# Broker 配置或 Topic 级别配置
min.insync.replicas=2
作用:
- 指定 ISR 中至少有多少个副本写入成功,Producer 才能收到确认
- 配合
acks=all使用,防止 ISR 中只剩 Leader 时丢失数据
最佳实践:
replication-factor=3+min.insync.replicas=2+acks=all- 保证至少 2 个副本写入成功,允许 1 个副本故障
ISR 副本不足时的行为:
ISR = [Leader](只剩一个副本)
↓
acks=all 的 Producer 写入会失败
↓
抛出 NOT_ENOUGH_REPLICAS 异常
↓
Producer 重试,等待副本恢复
3. 禁用 unclean.leader.election(重要)
unclean.leader.election.enable=false # Kafka 1.0+ 默认为 false
作用:
- 禁止从非 ISR 副本中选举 Leader
- 如果所有 ISR 副本都宕机,宁可服务不可用,也不从落后的副本中选举 Leader(避免数据丢失)
权衡:
false:数据可靠性优先(推荐)true:可用性优先(可能丢失数据)
4. 刷盘策略(可选)
# 强制刷盘配置(降低性能)
log.flush.interval.messages=1 # 每条消息刷盘
log.flush.interval.ms=1000 # 每秒刷盘
默认行为:
- Kafka 依赖操作系统的页缓存,不强制刷盘(性能优先)
- 通过副本机制保证可靠性,而非刷盘
生产建议:
- 不建议配置强制刷盘,严重影响性能
- 依赖副本机制 +
acks=all保证可靠性
五、Consumer 端保证消息不丢失
1. 手动提交 Offset(核心)
props.put("enable.auto.commit", false); // 禁用自动提交
自动提交的问题:
- 消费者拉取消息后,Offset 自动提交
- 如果业务处理失败(如数据库写入失败),消息已被标记为消费,导致丢失
手动提交方案:
// 方式1:同步提交(可靠但性能低)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record); // 业务处理
consumer.commitSync(); // 同步提交Offset
} catch (Exception e) {
// 处理失败,不提交Offset,下次重新消费
log.error("处理失败:" + record, e);
}
}
}
// 方式2:异步提交(推荐)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Offset提交失败", exception);
}
});
// 方式3:批量处理 + 手动提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processBatch(records); // 批量处理
consumer.commitSync(); // 批次处理成功后提交
}
2. 先处理后提交(核心原则)
// ✅ 正确做法:先处理,后提交
processRecord(record); // 1. 业务处理(如写数据库)
consumer.commitSync(); // 2. 提交Offset
// ❌ 错误做法:先提交,后处理
consumer.commitSync(); // 1. 提交Offset
processRecord(record); // 2. 业务处理(失败则消息丢失)
3. 处理 Rebalance(避免重复消费)
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Rebalance 前提交当前 Offset
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Rebalance 后从上次提交的位置继续消费
}
});
六、完整的高可靠配置
Producer 配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("acks", "all"); // 等待所有ISR确认
producerProps.put("retries", Integer.MAX_VALUE); // 无限重试
producerProps.put("max.in.flight.requests.per.connection", 1); // 保证顺序
producerProps.put("enable.idempotence", true); // 开启幂等性
producerProps.put("compression.type", "lz4"); // 压缩
Broker 配置
replication.factor=3 # 3个副本
min.insync.replicas=2 # 至少2个副本写入
unclean.leader.election.enable=false # 禁止非ISR选举
log.flush.interval.messages=10000 # 可选:刷盘策略
Consumer 配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group");
consumerProps.put("enable.auto.commit", false); // 手动提交
consumerProps.put("auto.offset.reset", "earliest"); // 从最早位置消费
consumerProps.put("isolation.level", "read_committed"); // 只读已提交消息(事务场景)
七、可靠性与性能的权衡
| 配置 | 高可靠 | 高性能 |
|---|---|---|
| acks | all | 0 或 1 |
| retries | 无限重试 | 0 |
| replication.factor | 3 | 1 |
| min.insync.replicas | 2 | 1 |
| enable.auto.commit | false | true |
| 刷盘 | 每条刷盘 | 依赖OS |
八、总结
Kafka 保证消息不丢失的核心机制:
- Producer 端:
acks=all:等待所有 ISR 副本确认- 开启幂等性:避免重试导致重复
- 同步发送 + 异常处理
- Broker 端:
- 副本机制:
replication-factor ≥ 3 - ISR 机制:
min.insync.replicas=2 - 禁止非 ISR 选举:
unclean.leader.election.enable=false
- 副本机制:
- Consumer 端:
- 手动提交 Offset:
enable.auto.commit=false - 先处理后提交:业务处理成功后再提交
- 处理 Rebalance:避免重复消费
- 手动提交 Offset:
面试答题要点:
- 三端保证:Producer(acks=all + 重试)、Broker(副本 + ISR)、Consumer(手动提交)
- 核心参数:
acks=all+min.insync.replicas=2+enable.auto.commit=false - 权衡点:可靠性 vs 性能、可用性 vs 一致性