前沿
在上一篇中,我们着重分析了quicklist的底层数据结构;它其实是双向链表+ziplist,结合两者的优点,扬长避短的结果;本篇主要学习zskiplistNode数据结构及使用方式
zskiplist目标
-
对于一般的链表,查询一个元素的时间复杂度为O(n);即使为有序链表,也无法通过二分缩减复杂度,因为链表地址不连续,没法跳跃查询
-
对于性能要求较高的server,需要高效的查询;红黑树、跳跃表都可以达到差不多的效果,时间复杂度为O(n)
-
在更新数据时,跳表需要更新的部分较少;而红黑树有个再平衡过程,期间要锁定整颗树,在高并发场景难以满足条件
-
跳表增删操作更加局部性,不需要锁定整个表,而且过程相对简单清晰
zskiplist基本性质
-
由很多层结构组成
-
每一层都是一个有序的单向链表
-
最底层(Level 0)的链表包含所有元素,且为双向链表
-
如果一个元素出现在 Level i 的链表中,则它在 Level i 之下的链表也都会出现
图解比较直观,参考资料
本篇主要内容
分析zskiplist底层数据结构及实现方式,分析基本增删操作代码
代码分析
创建跳表 zslCreateNode
/* 以指定的level创建跳跃表节点.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;/* 初始化时,节点的最大层数只有1 */
zsl->length = 0;/* 初始化时,跳跃表中没有节点,长度为0 */
/* 创建表头节点,level为32 */
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
/* 初始化表头各层 */
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
zslInsert
/* zskiplist插入节点:关键在两个数组update[],rank[]
* 1. 从zskiplist头部开始遍历,找到new ele在每一层插入的位置,记录在update[i]中
* 2. zslRandomLevel:确定new ele的层数new level;如果 new level>zskiplist->level,则高出的层数要初始化
* 3. 以传入数据创建新节点,并对new ele每一层插入该new node,更新链表及span
* 4. 更新new ele的backward指针,zskiplist长度+1
*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
/* update数组存储每一层位于插入节点的前一个节点 */
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
/* rank[0]+1 == 新节点的排名
* 其余都是为计算rank[0]产生的中间变量最终目标是rank[0] */
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
/* 从跳表的目前最高层网下,在每一层中找到新节点的插入位置 */
for (i = zsl->level-1; i >= 0; i--) {
/* 可以根据图形想象下,如何计算new ele的最终排名 */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
/* 1. 在本层已经没有后继节点
* 2. 如果forward->score< score || score相等,但是元素排序新元素较大
* 则继续在本层搜索插入点
*/
rank[i] += x->level[i].span;// 记录沿途跨越了多少个节点
x = x->level[i].forward;/* 检查本层下一个节点 */
}
update[i] = x;/* 否则,找到了插入点x,新元素在i层要插在x后面 */
}
/* update[i]->forward即为新节点在i层将要插入的位置 */
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
/* 确定新节点的层数,由大数定律保证当数据量很大时,每一层的节点数量是其上一层的1/2 */
level = zslRandomLevel();
/* 如果计算出来的层数要大于目前zskiplist的最高层数;则要将新增的层数初始化 */
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
/* 由于update[i]->forward后面要跟new ele,因此要初始化扩展的update数组 */
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
/* 以传入的数据创建新节点 */
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
/* 在要插入的每一层中,其实就是链表插入动作;
* 真实的链表是level数组,只需要更新当前层的链表,for循环完成后自然每一层的链表都更新完了 */
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* 计算x在i层新位置的span;插入一个节点,相当于原有的update[i]->level[i].span被截成两段 */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
/* 计算x在i层前一个节点的span */
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* 对于new ele没有触及的高层,要增加节点间的跨度 */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
/* */
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
zslDelete
/* 根据分数及sds删除节点
* 1. 首先逐层搜索可能删除节点的位置,记录在update数组中;但是这不是最终要删节点的数组
* 2. 确定在第0层找到的元素x是否为要删除节点,不是则zskiplist中无该元素
* 3. 如果找到该元素则zslDeleteNode,逐层删除该节点
* 4. 是否该节点内存,返回1表示成功
*/
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
/* 类似insert,从最高层往下逐层链表寻找要删除的节点(匹配score及ele即可) */
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;/* 在每一层链表中找到的要删除node,记录在update[i]中 */
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
/* 第0层的数据是全的,如果第0层都找不到要删除节点,则无该节点 */
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);/* 找到节点x,则要逐层删除该节点 */
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
/* 逐层删除元素x */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
/* 从zsl最高层开始;如果update[i].forward==x,说明i层存在x节点,要删除
* 1. 重新计算span 2. i层单项链表调整
*/
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
/* i层无x,也要调整span值;因为元素个数少了1个
* span:表示记录本节点和其forward节点的距离中间间隔的ele个数 */
update[i]->level[i].span -= 1;
}
}
/* 调整第0层的backward指针 */
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
/* 确定是否要调整跳跃表最大层数(被删除节点是跳跃表中高层链表的唯一元素) */
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
附录
关键数据结构
/* 有序集合跳跃表节点结构 */
typedef struct zskiplistNode {
sds ele;
double score;/* 节点分值,跳跃表中的所有节点都按照分值从小到大排序 */
struct zskiplistNode *backward;/* 后向指针;只在第0层使用 */
struct zskiplistLevel {
struct zskiplistNode *forward;/* 本node在该levle的前向节点 */
unsigned long span;/* 跨度,用于记录本节点和其forward节点的距离中间间隔的ele个数 */
} level[];/* 跳跃表层结构,一个zskiplistLevel数组 */
} zskiplistNode;
/* 有序集合跳跃表结构 */
typedef struct zskiplist {
struct zskiplistNode *header, *tail;/* 跳跃表头尾节点 */
unsigned long length;/* 跳跃表长度,即跳跃表当前的节点数量(表头节点不算在内,因为初始化默认创建表头) */
int level;/* 当前跳跃表中层数最大的节点的层数(表头不算在层数内) */
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset