问题
Redis跳表实现,hash底层实现,为什么使用ziplist,如何扩容?
答案
1. 跳表的实现细节
1.1 跳表核心结构
数据结构定义:
// 跳表节点
typedef struct zskiplistNode {
sds ele; // 元素值
double score; // 分数(排序依据)
struct zskiplistNode *backward; // 后退指针
struct zskiplistLevel {
struct zskiplistNode *forward; // 前进指针
unsigned long span; // 跨度(到下个节点的距离)
} level[]; // 层级数组(柔性数组)
} zskiplistNode;
// 跳表
typedef struct zskiplist {
struct zskiplistNode *header; // 头节点
struct zskiplistNode *tail; // 尾节点
unsigned long length; // 节点数量
int level; // 当前最大层数
} zskiplist;
// ZSet结构(跳表+哈希表)
typedef struct zset {
dict *dict; // 哈希表:element -> score(O(1)查询score)
zskiplist *zsl; // 跳表:score -> element(有序,支持范围查询)
} zset;
1.2 层数生成算法
随机层数:
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPLIST_P 0.25
int zslRandomLevel(void) {
int level = 1;
while ((random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level < ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
概率分布:
1层:100%
2层:25%(0.25概率)
3层:6.25%(0.25²)
4层:1.56%(0.25³)
平均层数 = 1 / (1 - P) = 1 / 0.75 ≈ 1.33层
为什么P=0.25?
- 太大(如0.5):索引层过多,内存开销大
- 太小(如0.1):索引效果差,查找性能下降
- 0.25:经验值,平衡内存和性能
1.3 插入算法实现
核心步骤:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL]; // 记录每层的前驱节点
unsigned long rank[ZSKIPLIST_MAXLEVEL]; // 记录每层的排名
// 1. 查找插入位置(从最高层开始)
zskiplistNode *x = zsl->header;
for (int i = zsl->level - 1; i >= 0; i--) {
rank[i] = (i == zsl->level - 1) ? 0 : rank[i + 1];
// 在当前层向前移动
while (x->level[i].forward != NULL &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x; // 记录前驱节点
}
// 2. 随机生成层数
int level = zslRandomLevel();
if (level > zsl->level) {
// 新层数超过当前最大层数
for (int i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 3. 创建新节点
x = zslCreateNode(level, score, ele);
// 4. 插入各层,更新指针和span
for (int i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
// 更新span(用于计算排名)
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 5. 更新高层的span
for (int i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 6. 更新后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
时间复杂度:平均O(log N)
1.4 为什么同时使用跳表和哈希表?
各自优势:
// 跳表:按score有序
ZRANGE rank 0 9 // O(log N + M),支持范围查询
ZRANGEBYSCORE rank 90 100 // O(log N + M)
// 哈希表:快速查score
ZSCORE rank "player:1001" // O(1),直接查询
内存代价:
- 同一份数据存两份
- 但换来了性能提升(范围查询+快速查分)
2. Hash的底层实现与扩容
2.1 Dict(字典)结构
// 字典
typedef struct dict {
dictht ht[2]; // 两个哈希表(用于渐进式rehash)
long rehashidx; // rehash进度(-1表示未进行)
unsigned long iterators; // 当前运行的迭代器数量
} dict;
// 哈希表
typedef struct dictht {
dictEntry **table; // 哈希表数组
unsigned long size; // 桶数量(2^n)
unsigned long sizemask; // 掩码(size - 1)
unsigned long used; // 已有节点数量
} dictht;
// 哈希节点
typedef struct dictEntry {
void *key;
void *val;
struct dictEntry *next; // 链地址法解决冲突
} dictEntry;
2.2 哈希算法
计算索引:
// 1. 计算哈希值
hash = MurmurHash2(key, keylen, seed);
// 2. 计算桶索引
index = hash & sizemask; // 等价于 hash % size
MurmurHash2特点:
- 速度快
- 分布均匀
- 雪崩效应好
2.3 渐进式Rehash扩容机制
触发条件:
扩容触发:
// 负载因子 = used / size
if (负载因子 >= 1 && 未在BGSAVE/BGREWRITEAOF) {
扩容到 size * 2
}
if (负载因子 >= 5) {
强制扩容(即使在BGSAVE)
}
缩容触发:
if (负载因子 < 0.1) {
缩容到第一个 >= used 的 2^n
}
2.4 渐进式Rehash流程
为什么需要渐进式?
// 假设有1000万个键
// 一次性rehash需要:
// 1. 分配新表(8MB -> 16MB)
// 2. 迁移1000万个键
// 3. 释放旧表
// 耗时:可能数百毫秒,阻塞Redis
渐进式实现:
// 1. 分配ht[1]空间
ht[1].size = ht[0].size * 2;
ht[1].table = malloc(...);
// 2. 设置rehash标志
rehashidx = 0;
// 3. 每次增删改查时,迁移一个桶
void dictRehashStep(dict *d) {
if (d->rehashidx == -1) return;
// 迁移ht[0].table[rehashidx]上的所有键
dictEntry *de = d->ht[0].table[d->rehashidx];
while (de) {
dictEntry *nextde = de->next;
// 计算在ht[1]中的索引
unsigned long h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
// 检查是否完成
if (d->ht[0].used == 0) {
free(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
}
}
// 4. 后台定时任务也会推进rehash
void databasesCron(void) {
// 每次执行1ms内尽可能多迁移
dictRehashMilliseconds(db->dict, 1);
}
2.5 Rehash期间的操作策略
查询:
// 先查ht[0],未找到再查ht[1]
dictEntry *dictFind(dict *d, const void *key) {
if (dictIsRehashing(d)) {
_dictRehashStep(d); // 顺便迁移一个桶
}
// 计算索引
unsigned long h = dictHashKey(d, key);
// 查找ht[0]
for (dictEntry *he = d->ht[0].table[h & d->ht[0].sizemask];
he != NULL; he = he->next) {
if (key == he->key || dictCompareKeys(d, key, he->key))
return he;
}
// 如果在rehash,还要查ht[1]
if (dictIsRehashing(d)) {
for (dictEntry *he = d->ht[1].table[h & d->ht[1].sizemask];
he != NULL; he = he->next) {
if (key == he->key || dictCompareKeys(d, key, he->key))
return he;
}
}
return NULL;
}
新增:
// 直接插入ht[1]
dictEntry *dictAddRaw(dict *d, void *key) {
if (dictIsRehashing(d)) {
_dictRehashStep(d);
}
// 计算索引
unsigned long index;
dictht *ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
index = hash & ht->sizemask;
// 插入链表头部
dictEntry *entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
return entry;
}
删除:
// 两个表都查找并删除
int dictDelete(dict *d, const void *key) {
if (dictIsRehashing(d)) {
_dictRehashStep(d);
}
// 在ht[0]和ht[1]中都尝试删除
unsigned long h = dictHashKey(d, key);
for (int table = 0; table <= 1; table++) {
unsigned long idx = h & d->ht[table].sizemask;
dictEntry *he = d->ht[table].table[idx];
dictEntry *prevHe = NULL;
while (he) {
if (key == he->key || dictCompareKeys(d, key, he->key)) {
// 找到了,删除
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
d->ht[table].used--;
return DICT_OK;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return DICT_ERR;
}
3. 为什么使用ZipList?
3.1 ZipList的优势
内存紧凑:
// LinkedList每个节点的开销
struct listNode {
void *value; // 8字节
listNode *prev; // 8字节
listNode *next; // 8字节
} // 总计24字节指针开销
// 存储100个小元素(每个1字节)
LinkedList: 100 * (24 + 1) = 2500字节
// ZipList: 紧凑存储
ZipList: 4(zlbytes) + 4(zltail) + 2(zllen) + 100 * 3(entry) + 1(zlend)
≈ 311字节
内存节省:(2500 - 311) / 2500 = 87.6%
3.2 缓存友好性
连续内存 vs 离散内存:
ZipList(连续):
[entry1][entry2][entry3][entry4]...
↑ 一次可加载多个entry到CPU缓存
LinkedList(离散):
[node1] -> [node2] -> [node3] -> [node4]
↑ 指针跳跃,缓存命中率低
3.3 使用场景
小对象场景:
# List:元素少、单个元素小
LPUSH list "a" "b" "c" # 3个小元素,用ZipList
# Hash:字段少、值小
HSET user:1001 name "Alice" age "25" # 2个字段,用ZipList
# ZSet:元素少、元素小
ZADD rank 100 "player1" 95 "player2" # 2个元素,用ZipList
配置阈值:
# List
list-max-ziplist-size -2 # 单个ZipList最大8KB
# Hash
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
# ZSet
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
3.4 ZipList的缺点
级联更新问题:
// ZipList: [entry1: 250字节][entry2: 250字节][entry3: 250字节]
// 每个entry的prevlen都是1字节(因为前一个<254字节)
// 插入一个260字节的元素到开头
[entry0: 260字节][entry1: 250字节][entry2: 250字节][entry3: 250字节]
// ↓ prevlen需从1字节扩展到5字节
// ↓ 后续所有entry都可能需要更新
解决方案:ListPack(Redis 7.0+)
// ListPack: 不记录前一个entry的长度
entry = <encoding><data><len> // len是自身长度
// 避免级联更新
4. 编码转换时机
4.1 List的编码转换
QuickList节点大小控制:
# list-max-ziplist-size
-5: 64KB
-4: 32KB
-3: 16KB
-2: 8KB(默认)
-1: 4KB
# 超过限制会创建新QuickListNode
4.2 Hash的编码转换
ZipList -> HashTable:
# 触发条件(满足任一)
1. 元素数量 > 512
2. 单个value大小 > 64字节
# 示例
HSET user:1001 name "Alice" # 1个字段,用ZipList
HSET user:1001 field1 "v1" ... field600 "v600" # 超过512,转HashTable
HSET user:1001 bio "很长的文本........................" # 超过64字节,转HashTable
Java观察:
// 查看编码
redisTemplate.execute((RedisCallback<String>) connection -> {
return new String(connection.execute("OBJECT", "ENCODING", "user:1001".getBytes()));
});
4.3 Set的编码转换
IntSet -> HashTable:
# 触发条件(满足任一)
1. 元素数量 > 512
2. 出现非整数元素
# 示例
SADD nums 1 2 3 # 3个整数,用IntSet
SADD nums 4 5 ... 600 # 超过512,转HashTable
SADD nums "hello" # 出现非整数,转HashTable
4.4 ZSet的编码转换
ZipList -> SkipList:
# 触发条件(满足任一)
1. 元素数量 > 128
2. 单个元素大小 > 64字节
# 示例
ZADD rank 100 "p1" 95 "p2" # 2个元素,用ZipList
ZADD rank 90 "很长的名字........................" # 超过64字节,转SkipList
5. 性能优化建议
5.1 控制对象大小
享受紧凑编码:
// 优化前:大Hash
Map<String, String> bigUser = new HashMap<>();
for (int i = 0; i < 1000; i++) {
bigUser.put("field" + i, "value" + i);
}
redisTemplate.opsForHash().putAll("user:1001", bigUser); // 1000个字段,HashTable
// 优化后:拆分小Hash
Map<String, String> basicInfo = new HashMap<>();
basicInfo.put("name", "Alice");
basicInfo.put("age", "25");
redisTemplate.opsForHash().putAll("user:1001:basic", basicInfo); // 2个字段,ZipList
Map<String, String> extInfo = new HashMap<>();
extInfo.put("city", "Beijing");
redisTemplate.opsForHash().putAll("user:1001:ext", extInfo); // ZipList
5.2 避免大Value
问题:
HSET user:1001 profile "超过64字节的长文本........................"
# 触发转换为HashTable,内存占用增加
优化:
# 大文本单独存储
SET user:1001:profile "长文本........................"
HSET user:1001 name "Alice" age "25" profile_id "user:1001:profile"
# Hash保持ZipList编码
5.3 监控编码变化
脚本监控:
# 批量检查编码
redis-cli --scan --pattern "user:*" | while read key; do
encoding=$(redis-cli OBJECT ENCODING "$key")
echo "$key: $encoding"
done
Java监控:
@Scheduled(fixedRate = 60000)
public void monitorEncoding() {
Set<String> keys = redisTemplate.keys("user:*");
for (String key : keys) {
String encoding = redisTemplate.execute((RedisCallback<String>) connection ->
new String(connection.execute("OBJECT", "ENCODING", key.getBytes()))
);
if ("hashtable".equals(encoding)) {
log.warn("Key {} converted to hashtable", key);
}
}
}
6. 面试答题总结
标准回答模板:
跳表实现:
- 多层索引结构,每个节点随机1-32层(P=0.25,平均1.33层)
- 配合哈希表使用:跳表支持范围查询,哈希表支持O(1)查score
- 插入、删除、查找平均O(log N)
Hash扩容(渐进式rehash):
- 触发条件:负载因子≥1(未BGSAVE)或≥5(强制)
- 流程:分配ht[1],设置rehashidx,每次操作迁移一个桶,后台定时推进
- 期间策略:查询两个表,新增写ht[1],删除两个表都查
为什么用ZipList:
- 内存紧凑:无指针开销,节省87%+内存
- 缓存友好:连续内存,CPU缓存命中率高
- 适合小对象:元素少且小时性能和内存都优
- Redis 7.0+改用ListPack,解决级联更新问题
常见追问:
- 渐进式rehash为什么不一次性完成? → 大表一次性迁移会阻塞Redis数百毫秒
- rehash期间如何查询? → 先查ht[0],未找到再查ht[1]
- ZipList的缺点? → 级联更新问题,ListPack已解决
- 如何避免频繁编码转换? → 控制元素数量和大小在阈值以下