一、问题背景
扫表任务是指需要遍历处理数据库中所有或大量数据的场景,如:
- 数据迁移
- 批量修复脏数据
- 定时统计任务
- 历史数据归档
死循环风险:处理逻辑不当导致反复处理相同数据,永远无法完成。
二、死循环产生的典型场景
场景1:基于LIMIT的翻页问题(最常见)
错误示例1:固定OFFSET翻页
@Scheduled(cron = "0 0 2 * * ?")
public void scanAndUpdateUsers() {
int pageSize = 1000;
int offset = 0;
while (true) {
// 查询一页数据
List<User> users = jdbcTemplate.query(
"SELECT * FROM users LIMIT ? OFFSET ?",
new UserRowMapper(), pageSize, offset
);
if (users.isEmpty()) {
break; // 没有更多数据
}
// 处理数据(包含UPDATE操作)
for (User user : users) {
if (needUpdate(user)) {
// 问题:UPDATE可能改变排序,导致重复扫描
jdbcTemplate.update(
"UPDATE users SET status = ? WHERE id = ?",
"processed", user.getId()
);
}
}
offset += pageSize; // 移动到下一页
}
}
死循环原因:
初始数据(按ID排序):
ID: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
第1页(OFFSET 0, LIMIT 3): 1, 2, 3
处理完成,UPDATE id=2 的某个字段(如last_update_time)
如果查询有ORDER BY last_update_time,排序变化:
ID: 1, 3, 4, 5, 6, 7, 8, 9, 10, 2
第2页(OFFSET 3, LIMIT 3): 5, 6, 7
问题:跳过了 id=4!
或者相反,如果UPDATE导致记录前移:
可能导致 id=2 在下一页再次被扫描(重复处理)
错误示例2:基于时间戳的分页
public void scanByTimestamp() {
Date lastTimestamp = null;
int pageSize = 1000;
while (true) {
List<User> users;
if (lastTimestamp == null) {
users = jdbcTemplate.query(
"SELECT * FROM users ORDER BY create_time LIMIT ?",
new UserRowMapper(), pageSize
);
} else {
users = jdbcTemplate.query(
"SELECT * FROM users WHERE create_time > ? ORDER BY create_time LIMIT ?",
new UserRowMapper(), lastTimestamp, pageSize
);
}
if (users.isEmpty()) {
break;
}
// 处理数据
processUsers(users);
// 记录最后一条的时间戳
lastTimestamp = users.get(users.size() - 1).getCreateTime();
}
}
死循环风险:
假设有1000条记录的 create_time 都是 '2025-11-02 10:00:00'
第1页:查询 LIMIT 1000 → 返回前1000条
最后一条时间戳: 2025-11-02 10:00:00
第2页:WHERE create_time > '2025-11-02 10:00:00'
→ 跳过所有相同时间戳的记录
→ 漏掉数据!
或者如果条件改为 >=:
→ 重复处理相同时间戳的记录
→ 死循环!
场景2:基于标记位的重复处理
public void processUnprocessedOrders() {
while (true) {
// 查询未处理的订单
List<Order> orders = jdbcTemplate.query(
"SELECT * FROM orders WHERE status = 'pending' LIMIT 1000",
new OrderRowMapper()
);
if (orders.isEmpty()) {
break;
}
for (Order order : orders) {
try {
// 处理订单
processOrder(order);
// 更新状态
jdbcTemplate.update(
"UPDATE orders SET status = 'processed' WHERE id = ?",
order.getId()
);
} catch (Exception e) {
// 问题:处理失败,状态未更新,下次继续查出来
log.error("处理失败", e);
// 可能导致死循环:反复处理同一条失败数据
}
}
}
}
死循环场景:
- 某条订单处理必然失败(如数据异常)
- 状态未更新,下次扫描继续查出
- 反复处理同一条失败记录
场景3:数据被并发修改
@Scheduled(fixedRate = 60000)
public void syncDataToES() {
Long lastSyncId = getLastSyncId(); // 从Redis获取上次同步的ID
while (true) {
List<Product> products = jdbcTemplate.query(
"SELECT * FROM products WHERE id > ? ORDER BY id LIMIT 1000",
new ProductRowMapper(), lastSyncId
);
if (products.isEmpty()) {
break;
}
// 同步到ES
for (Product product : products) {
elasticSearchService.index(product);
lastSyncId = product.getId(); // 更新游标
}
// 保存游标到Redis
saveLastSyncId(lastSyncId);
}
}
并发问题:
时刻T1: 任务启动,lastSyncId=1000
时刻T2: 查询 WHERE id > 1000,获取 1001-2000
时刻T3: 正在处理中...
时刻T4: 另一个实例也启动,lastSyncId=1000(Redis未及时更新)
时刻T5: 第二个实例也查询 WHERE id > 1000
→ 重复处理!
或者更严重的情况:
时刻T6: 任务A处理完 id=1500
时刻T7: 任务B从 id=1000 开始,覆盖了游标
→ 永久循环在 1000-1500 之间
三、正确的解决方案
方案1:基于主键的游标遍历(推荐)
@Service
public class UserScanService {
public void scanAndProcess() {
Long lastId = 0L; // 从0开始
int pageSize = 1000;
while (true) {
// 使用主键作为游标
List<User> users = jdbcTemplate.query(
"SELECT * FROM users WHERE id > ? ORDER BY id ASC LIMIT ?",
new UserRowMapper(), lastId, pageSize
);
if (users.isEmpty()) {
log.info("扫描完成,共处理 {} 条", totalProcessed);
break;
}
// 处理数据
for (User user : users) {
processUser(user);
}
// 更新游标:取本批次最后一条的ID
lastId = users.get(users.size() - 1).getId();
log.info("处理批次完成,当前游标: {}", lastId);
}
}
private void processUser(User user) {
// 即使这里UPDATE了user表的其他字段
// 也不影响下一次查询(因为主键不变)
jdbcTemplate.update(
"UPDATE users SET status = ?, updated_time = NOW() WHERE id = ?",
"processed", user.getId()
);
}
}
优势:
- ✅ 主键不会因UPDATE改变
- ✅ 主键有索引,查询高效
- ✅ 不会漏数据或重复扫描
- ✅ 支持断点续传
方案2:分段加锁处理
@Service
public class OrderProcessService {
public void processOrders() {
Long minId = getMinUnprocessedId();
Long maxId = getMaxUnprocessedId();
int batchSize = 1000;
for (long startId = minId; startId <= maxId; startId += batchSize) {
long endId = Math.min(startId + batchSize - 1, maxId);
// 锁定ID范围
List<Order> orders = jdbcTemplate.query(
"SELECT * FROM orders " +
"WHERE id BETWEEN ? AND ? AND status = 'pending' " +
"FOR UPDATE",
new OrderRowMapper(), startId, endId
);
for (Order order : orders) {
try {
processOrder(order);
updateStatus(order.getId(), "processed");
} catch (Exception e) {
// 标记为失败,避免死循环
updateStatus(order.getId(), "failed");
log.error("订单处理失败: {}", order.getId(), e);
}
}
}
}
private Long getMinUnprocessedId() {
return jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM orders WHERE status = 'pending'",
Long.class
);
}
private Long getMaxUnprocessedId() {
return jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM orders WHERE status = 'pending'",
Long.class
);
}
}
关键点:
- 先确定处理范围(minId ~ maxId)
- 按ID段遍历,避免动态查询
- 失败记录标记为
failed而非继续保持pending
方案3:使用处理时间戳+ID组合游标
@Service
public class DataMigrationService {
public void migrateData() {
Timestamp lastTime = new Timestamp(0);
Long lastId = 0L;
int pageSize = 1000;
while (true) {
List<Data> dataList = jdbcTemplate.query(
"SELECT * FROM source_table " +
"WHERE (create_time > ?) OR (create_time = ? AND id > ?) " +
"ORDER BY create_time, id " +
"LIMIT ?",
new DataRowMapper(), lastTime, lastTime, lastId, pageSize
);
if (dataList.isEmpty()) {
break;
}
// 批量迁移
batchInsertToTarget(dataList);
// 更新游标:取最后一条的时间戳+ID
Data lastData = dataList.get(dataList.size() - 1);
lastTime = lastData.getCreateTime();
lastId = lastData.getId();
}
}
}
优势:
- 解决时间戳重复问题(通过ID辅助)
- 支持按时间范围筛选
- 适合增量同步场景
索引要求:
CREATE INDEX idx_time_id ON source_table (create_time, id);
方案4:基于快照的扫描
@Service
public class SnapshotScanService {
public void scanWithSnapshot() {
// 1. 创建待处理ID快照表
jdbcTemplate.update(
"CREATE TEMPORARY TABLE tmp_scan_ids AS " +
"SELECT id FROM users WHERE status = 'pending'"
);
Long lastId = 0L;
int batchSize = 1000;
while (true) {
// 2. 从快照表中取ID
List<Long> ids = jdbcTemplate.queryForList(
"SELECT id FROM tmp_scan_ids " +
"WHERE id > ? ORDER BY id LIMIT ?",
Long.class, lastId, batchSize
);
if (ids.isEmpty()) {
break;
}
// 3. 根据ID列表处理
List<User> users = jdbcTemplate.query(
"SELECT * FROM users WHERE id IN (" +
ids.stream().map(String::valueOf).collect(Collectors.joining(",")) +
")",
new UserRowMapper()
);
for (User user : users) {
processUser(user);
}
lastId = ids.get(ids.size() - 1);
}
// 4. 清理快照表
jdbcTemplate.update("DROP TEMPORARY TABLE tmp_scan_ids");
}
}
优势:
- 完全避免数据变更影响
- 处理范围固定,必然终止
- 适合复杂条件筛选
方案5:幂等性设计+重试上限
@Service
public class IdempotentProcessService {
public void processWithRetryLimit() {
int maxRetry = 3;
while (true) {
List<Task> tasks = jdbcTemplate.query(
"SELECT * FROM tasks " +
"WHERE status = 'pending' AND retry_count < ? " +
"LIMIT 1000",
new TaskRowMapper(), maxRetry
);
if (tasks.isEmpty()) {
break;
}
for (Task task : tasks) {
try {
// 幂等性处理
processTask(task);
jdbcTemplate.update(
"UPDATE tasks SET status = 'success', retry_count = 0 WHERE id = ?",
task.getId()
);
} catch (Exception e) {
// 增加重试次数
jdbcTemplate.update(
"UPDATE tasks SET retry_count = retry_count + 1 WHERE id = ?",
task.getId()
);
log.error("任务处理失败,当前重试次数: {}", task.getRetryCount() + 1, e);
// 达到重试上限,标记为失败
if (task.getRetryCount() + 1 >= maxRetry) {
jdbcTemplate.update(
"UPDATE tasks SET status = 'failed' WHERE id = ?",
task.getId()
);
}
}
}
}
}
private void processTask(Task task) {
// 幂等性实现:重复执行结果一致
// 例如:使用唯一键约束防止重复插入
jdbcTemplate.update(
"INSERT INTO result_table (task_id, result) VALUES (?, ?) " +
"ON DUPLICATE KEY UPDATE result = VALUES(result)",
task.getId(), calculateResult(task)
);
}
}
表结构:
CREATE TABLE tasks (
id BIGINT PRIMARY KEY,
status VARCHAR(20), -- pending/success/failed
retry_count INT DEFAULT 0,
create_time DATETIME,
INDEX idx_status_retry (status, retry_count)
);
方案6:分布式锁防止并发执行
@Service
public class DistributedScanService {
@Autowired
private RedissonClient redissonClient;
@Scheduled(cron = "0 0 2 * * ?")
public void scheduledScan() {
RLock lock = redissonClient.getLock("scan_task_lock");
try {
// 尝试获取锁,最多等待0秒,锁自动释放时间10分钟
boolean acquired = lock.tryLock(0, 600, TimeUnit.SECONDS);
if (!acquired) {
log.warn("扫表任务已在其他实例执行,跳过");
return;
}
log.info("获取锁成功,开始扫表");
scanAndProcess();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void scanAndProcess() {
// 使用方案1的主键游标遍历
// ...
}
}
四、生产环境最佳实践
1. 完整的扫表任务模板
@Service
public class ProductionScanTemplate {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedissonClient redissonClient;
public ScanResult scanAndProcess(ScanConfig config) {
RLock lock = redissonClient.getLock("scan:" + config.getTaskName());
try {
// 1. 获取分布式锁
if (!lock.tryLock(0, config.getLockTimeout(), TimeUnit.SECONDS)) {
return ScanResult.skipped("任务已在执行");
}
// 2. 恢复上次游标
Long lastId = getLastCursor(config.getTaskName());
int totalProcessed = 0;
int failedCount = 0;
while (true) {
// 3. 批量查询
List<Map<String, Object>> rows = jdbcTemplate.queryForList(
config.getQuerySql(), lastId, config.getBatchSize()
);
if (rows.isEmpty()) {
break;
}
// 4. 批量处理
for (Map<String, Object> row : rows) {
try {
config.getProcessor().accept(row);
totalProcessed++;
} catch (Exception e) {
failedCount++;
log.error("处理失败: {}", row, e);
if (failedCount > config.getMaxFailures()) {
throw new RuntimeException("失败次数超过阈值,任务中止");
}
}
}
// 5. 更新游标
lastId = (Long) rows.get(rows.size() - 1).get("id");
saveCursor(config.getTaskName(), lastId);
// 6. 速率限制
if (config.getRateLimitMs() > 0) {
Thread.sleep(config.getRateLimitMs());
}
}
return ScanResult.success(totalProcessed, failedCount);
} catch (Exception e) {
return ScanResult.failed(e.getMessage());
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private Long getLastCursor(String taskName) {
String key = "scan_cursor:" + taskName;
String value = redisTemplate.opsForValue().get(key);
return value != null ? Long.parseLong(value) : 0L;
}
private void saveCursor(String taskName, Long cursor) {
String key = "scan_cursor:" + taskName;
redisTemplate.opsForValue().set(key, cursor.toString(), 7, TimeUnit.DAYS);
}
}
使用示例:
ScanConfig config = ScanConfig.builder()
.taskName("update_user_level")
.querySql("SELECT * FROM users WHERE id > ? ORDER BY id LIMIT ?")
.batchSize(1000)
.processor(row -> updateUserLevel((Long) row.get("id")))
.maxFailures(100)
.rateLimitMs(100) // 每批次间隔100ms
.lockTimeout(600)
.build();
ScanResult result = scanTemplate.scanAndProcess(config);
2. 监控与告警
@Component
public class ScanTaskMonitor {
@Scheduled(fixedRate = 60000)
public void checkStuckTasks() {
List<TaskMetric> metrics = jdbcTemplate.query(
"SELECT task_name, last_cursor, update_time " +
"FROM scan_task_progress",
new TaskMetricRowMapper()
);
for (TaskMetric metric : metrics) {
long idleMinutes = Duration.between(
metric.getUpdateTime(), Instant.now()
).toMinutes();
if (idleMinutes > 30) {
// 30分钟未更新,可能死循环或卡死
alertService.send(String.format(
"扫表任务 %s 超过30分钟未更新游标,请检查!当前游标: %d",
metric.getTaskName(), metric.getLastCursor()
));
}
}
}
}
五、答题总结
面试回答框架:
-
问题识别:
“扫表任务死循环主要由三类原因导致:一是基于OFFSET或时间戳翻页时,UPDATE改变排序导致重复扫描;二是处理失败但状态未更新,反复查出同一条数据;三是并发执行时游标冲突” -
核心方案:
“最可靠的方法是使用主键作为游标,WHERE id > ? ORDER BY id LIMIT ?,主键不会因UPDATE改变且有索引,既高效又不会漏数据” -
异常处理:
“处理失败的记录要标记为failed或增加retry_count,设置重试上限,避免永远停留在pending状态造成死循环” -
生产实践:
“生产环境还需要加分布式锁防并发、保存游标支持断点续传、监控任务进度,发现长时间未更新要告警”
关键点:
- 理解基于OFFSET翻页的问题
- 掌握主键游标遍历方案
- 强调幂等性和重试上限设计
- 能说出完整的异常处理策略