11. AVL树和有序集合
# 11. AVL树和有序集合
在上一章AVL树的基础上,有序集合(sorted set)这种数据结构就很容易添加啦。结构定义如下:
struct ZSet {
AVLNode *tree = NULL;
HMap hmap;
};
struct ZNode {
AVLNode tree;
HNode hmap;
double score = 0;
size_t len = 0;
char name[0];
};
static ZNode *znode_new(const char *name, size_t len, double score) {
ZNode *node = (ZNode * )malloc(sizeof(ZNode) + len);
assert(node); // 这在实际项目里可不是个好主意
avl_init(&node->tree);
node->hmap.next = NULL;
node->hmap.hcode = str_hash((uint8_t * )name, len);
node->score = score;
node->len = len;
memcpy(&node->name[0], name, len);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
有序集合是一个由(分数,名称)对组成的有序列表,它支持通过排序键或名称进行查询和更新。它是AVL树和哈希表(hashtable)的组合,而且每个节点对同时属于这两种结构,这就体现了侵入式数据结构(intrusive data structures)的灵活性。名称字符串被嵌入在节点对的末尾,这样能节省点空间开销。
向树中插入的函数和上一章的测试代码差不多:
// 插入到AVL树中
static void tree_add(ZSet *zset, ZNode *node) {
if (!zset->tree) {
zset->tree = &node->tree;
return ;
}
AVLNode *cur = zset->tree;
while (true) {
AVLNode **from = zless(&node->tree, cur) ? &cur->left : &cur->right;
if (! *from) {
*from = &node->tree;
node->tree.parent = cur;
zset->tree = avl_fix(&node->tree);
break;
}
cur = *from;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
zless
是用于比较两个节点对的辅助函数:
// 按(分数,名称)元组进行比较
static bool zless(
AVLNode *lhs, double score, const char *name, size_t len)
{
ZNode *zl = container_of(lhs, ZNode, tree);
if (zl->score != score) {
return zl->score < score;
}
int rv = memcmp(zl->name, name, min(zl->len, len));
if ( rv != 0) {
return rv < 0;
}
return zl->len < len;
}
static bool zless(AVLNode *lhs, AVLNode * rhs) {
ZNode *zr = container_of(rhs, ZNode, tree);
return zless(lhs, zr->score, zr->name, zr->len);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
插入/更新的子例程:
// 更新现有节点的分数(AVL树重新插入)
static void zset_update(ZSet *zset, ZNode *node, double score) {
if (node->score == score) {
return ;
}
zset->tree = avl_del(&node->tree);
node->score = score;
avl_init(&node->tree);
tree_add(zset, node);
}
// 添加一个新的(分数,名称)元组,或者更新现有元组的分数
bool zset_add(ZSet *zset, const char *name, size_t len, double score) {
ZNode *node = zset_lookup(zset, name, len);
if (node) {
zset_update(zset, node, score);
return false;
} else {
node = znode_new(name, len, score);
hm_insert(&zset->hmap, &node->hmap);
tree_add(zset, node);
return true;
}
}
// 按名称查找
ZNode *zset_lookup(ZSet *zset, const char *name, size_t len) {
// 只是哈希表查找
// 代码省略...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
有序集合的主要应用场景是范围查询:
// 查找大于或等于给定参数的(分数,名称)元组,然后根据它进行偏移
ZNode *zset_query(
ZSet *zset, double score, const char *name, size_t len, int64_t offset)
{
AVLNode *found = NULL;
AVLNode *cur = zset->tree;
while (cur) {
if (zless(cur, score, name, len)) {
cur = cur->right;
} else {
found = cur; // 候选节点
cur = cur->left;
}
}
if (found) {
found = avl_offset(found, offset);
}
return found ? container_of(found, ZNode, tree) : NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
范围查询就是普通的二叉树查找,再加上一个偏移操作。偏移操作让有序集合变得很特别,它可不是普通的二叉树遍历。
咱们回顾一下AVLNode
:
struct AVLNode {
uint32_t depth = 0;
uint32_t cnt = 0;
AVLNode *left = NULL;
AVLNode * right = NULL;
AVLNode *parent = NULL;
};
1
2
3
4
5
6
7
2
3
4
5
6
7
它有个额外的cnt
字段(树的大小),上一章没讲。这个字段在avl_offset
函数里会用到:
// 偏移到后继或前驱节点
// 注意:无论偏移量有多长,最坏情况下时间复杂度都是O(log(n))
AVLNode *avl_offset(AVLNode *node, int64_t offset) {
int64_t pos = 0; // 相对于起始节点
while (offset != pos) {
if (pos < offset && pos + avl_cnt(node->right) >= offset) {
// 目标节点在右子树内
node = node->right;
pos += avl_cnt(node->left) + 1;
} else if (pos > offset && pos - avl_cnt(node->left) <= offset) {
// 目标节点在左子树内
node = node->left;
pos -= avl_cnt(node->right) + 1;
} else {
// 转到父节点
AVLNode *parent = node->parent;
if (! parent) {
return NULL;
}
if (parent->right == node) {
pos -= avl_cnt(node->left) + 1;
} else {
pos += avl_cnt(node->right) + 1;
}
node = parent;
}
}
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
有了嵌入在节点里的大小信息,我们就能判断偏移目标是否在子树内。偏移操作分两个阶段:首先,如果目标不在子树内,就沿着树向上走;然后再沿着树向下走,逐渐缩小距离,直到找到目标。不管偏移量多长,最坏情况下时间复杂度都是O(log(n)),这可比一个一个往后继节点走(最好情况下时间复杂度为O(偏移量))强多啦。真正的Redis项目在跳跃表(skip lists)里也用了类似的技术。
现在停下来测试一下新的avl_offset
函数是个不错的主意。
static void test_case(uint32_t sz) {
Container c;
for (uint32_t i = 0; i < sz; ++i) {
add(c, i);
}
AVLNode *min = c.root;
while (min->left) {
min = min->left;
}
for (uint32_t i = 0; i < sz; ++i) {
AVLNode *node = avl_offset(min, (int64_t)i);
assert(container_of(node, Data, node)->val == i);
for (uint32_t j = 0; j < sz; ++j) {
int64_t offset = (int64_t)j - (int64_t)i;
AVLNode *n2 = avl_offset(node, offset);
assert(container_of(n2, Data, node)->val == j);
}
assert(!avl_offset(node, -(int64_t)i - 1));
assert(!avl_offset(node, sz - i));
}
dispose(c.root);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
目前,我们已经实现了有序集合的主要功能。现在把有序集合类型添加到咱们的服务器里。
enum {
T_STR = 0,
T_ZSET = 1,
};
// 键的结构
struct Entry {
struct HNode node;
std::string key;
std::string val;
uint32_t type = 0;
ZSet *zset = NULL;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
剩下的代码就比较简单啦,代码清单里就省略了。添加一个Python脚本来测试新命令:
CASES = r'''
$ ./client zscore asdf n1
(nil)
$ ./client zquery xxx 1 asdf 1 10
(arr) len=0
(arr) end
# 更多测试用例...
'''
import shlex
import subprocess
cmds = []
outputs = []
lines = CASES.splitlines()
for x in lines:
x = x.strip()
if not x:
continue
if x.startswith('$ '):
cmds.append(x[2:])
outputs.append('')
else:
outputs[-1] = outputs[-1] + x + '\n'
assert len(cmds) == len(outputs)
for cmd, expect in zip(cmds, outputs):
out = subprocess.check_output(shlex.split(cmd)).decode('utf-8')
assert out == expect, f'cmd:{cmd} out:{out} '
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 练习题
avl_offset
函数让我们能按排名查询有序集合,现在反过来,给定AVL树中的一个节点,找到它的排名,要求最坏情况下时间复杂度为O(log(n))。(这就是zrank
命令。)- 有序集合的另一个应用:统计某个范围内的元素数量。(同样要求最坏情况下时间复杂度为O(log(n))。)
11_server.cpp
文件里已经有一些有序集合命令了,试着再添加一些。
11_client.cpp
11_server.cpp
avl.cpp
avl.h
common.h
hashtable.cpp
hashtable.h
test_cmds.py
test_offset.cpp
zset.cpp
zset.h
上次更新: 2025/03/25, 00:48:42