摘要:函数考虑了一些与客户端交互的内容,学的时候没必要细看,它主要分为以下两步调用找到排名的节点从这个节点开始遍历个节点下面是的代码从高层向底层累加直到累加的值等于范围查找功能给定一个的范围,从中找出满足该范围的所有节点。
跳跃表是Redis zset的底层实现之一,zset在member较多时会采用跳跃表作为底层实现,它在添加、删除、查找节点上都拥有与红黑树相当的性能,它其实说白了就是一种特殊的链表,链表的每个节点存了不同的“层”信息,用这种分层存节点的方式在查找节点时能跳过些节点,从而使添加、删除、查找操作都拥有了O(logn)的平均时间复杂度。
下面简单介绍一下跳跃表:
跳跃表最低层(第一层)是一个拥有跳跃表所有节点的普通链表,每次在往跳跃表插入链表节点时一定会插入到这个最低层,至于是否插入到上层 就由抛硬币决定(这么说不是很准确,redis里这个概率是1/4而非1/2,为了表述方便先这么说),什么意思呢?假设已经有一个跳跃表,其高度只有一层:
往表中插入节点“7”时,假设插入7时抛硬币的结果是正,则在第二层中插入“7”节点,继续抛一次看看还能不能上到第三层,为反则停止插入,上层不再插入“7”节点了:
同理插入“4”节点假设连续抛两次都抛了正面,第三次抛了反面,则“4”节点会插入到2、3层:
这就是跳跃表最基本的样子。
查找一个节点时,我们只需从高层到低层,一个个链表查找,每次找到该层链表中小于等于目标节点的最大节点,直到找到为止。由于高层的链表迭代时会“跳过”低层的部分节点,所以跳跃表会比正常的链表查找少查部分节点,这也是skiplist名字的由来。
假如我们需要查找节点“5”:
先遍历最高层,发现第三层头结点的下一个节点是“4”,4<5,所以游标定位到“4”节点,但是“4”节点的下一个节点是空,得继续往低层走;第二层也从“4”节点开始,“4”节点在第二层的下一个节点是“7”,7>5,公交车做过头了,回来依旧定位在“4”节点;继续往低层走,第一层“4”节点的下一个节点是“5”,这就找到了。
事实上插入前也需要进行跳跃表查找操作,上文演示的插入流程只是直接用了链表而省略了这一步。
跳跃表有了个大概的了解,接下来我们细说redis里的skiplist
redis的skiplist有两个主要的数据结构,
zskiplistNode:skiplist的节点
zskiplist:用来记录一个skiplist的长度、高度等信息
zskiplistNode的定义typedef struct zskiplistNode { robj *obj;//zset的member double score;//zset的score struct zskiplistNode *backward;//指向上一个节点,用于zrevrange命令 struct zskiplistLevel { struct zskiplistNode *forward;//指向下一个节点 unsigned int span;//到达后一个节点的跨度(两个相邻节点span为1) } level[];//该节点在各层的信息,柔性数组成员 } zskiplistNode;
backward变量是特意为zrevrange*系列命令准备的,目的是为了使跳跃表实现反向遍历,普通跳跃表的实现里是非必要的。
level变量记录了此节点各层的信息:
level[x]->forward表示该节点在第x层的下一个节点。跳跃表节点都会出现在最低层的链表里,所以都会有level[0],通过level[0]->forward实现跳跃表正向遍历,zrange*系列命令就是以此实现的。
level[x]->span表示该节点到第x层的下一个节点跳跃了多少节点。span主要是为了zrank、zrevrank服务,普通跳跃表的实现里是非必要的。
zskiplist的定义typedef struct zskiplist { struct zskiplistNode *header, *tail;//跳跃表头尾节点 unsigned long length;//节点个数 int level;//除头结点外最大的层数 } zskiplist;
zskiplist的头结点不是一个有效的节点,它有ZSKIPLIST_MAXLEVEL层(32层),每层的forward指向该层跳跃表的第一个节点,若没有则为null。
btw:跳跃表节点的层数限制在了32,若想超过32层得连续32次抛硬币都得到正面,这得有足够多的节点,redis限定了抛硬币正面的概率为1/4,所以到达32层的概率为(1/2)^64,一般一台64位的计算机能拥有的最大内存也无法存储这么多zskiplistNode,所以对于基本使用 32层的上限已经足够高了,再高也没必要 浪费头节点的内存。
最终zskiplist内存布局如下:
也可以去掉不必要的部分(backward、obj),作出如下抽象后的图:
下文我们都通过简化图来分析跳跃表的基本操作流程。
zskiplist相关接口 创建一个zskiplist 时间复杂度O(1)创建一个zskiplist就是创建一个高度为ZSKIPLIST_MAXLEVEL(32)的头结点,score为0,member为null。
代码如下:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//画图,32level的头结点 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { //头结点每个level的下一个节点都初始化为null,跨度为0 zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; } //为指定高度的节点分配空间并赋值,insert操作也要用到 zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));//柔性数组成员 zn->score = score; zn->obj = obj; return zn; }
执行完zslCreate后会得到如下布局:
zskiplist插入一个节点 时间复杂度O(logn)插入一个节点时需要做以下工作
要找到新节点的插入位置,只需像上文介绍的那样,从高层向低层找即可。在找的过程中用update[]数组记录每一层插入位置的上一个节点,用rank[]数组记录每一层插入位置的上一个节点在跳跃表中的排名。根据update[]插入新节点,插入完成后再根据rank[]更新跨度信息即可。
ps:redis允许节点有重复的score,当score相同时根据member(代码里的obj指向的字符串)的字典序来排名。
/* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */ int zslRandomLevel(void) {//返回一个随机的层数,不是level的索引是层数 int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的概率加入到上一层中 level += 1; return (levelheader;//x用于迭代zskiplistNode for (i = zsl->level-1; i >= 0; i--) {//从最高层向最底层查询,找到合适的插入位置 /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];//记录每一层插入节点的上一个节点的排名 while (x->level[i].forward &&//当前层的下一个节点存在 (x->level[i].forward->score < score ||//下一个节点的分数小于需要插入的分数 (x->level[i].forward->score == score &&//score相同的情况下,根据member字符串的大小来比较(二进制安全的memcmp) compareStringObjects(x->level[i].forward->obj,obj) < 0))) { rank[i] += x->level[i].span;//每层的跨度 x = x->level[i].forward;//下一个节点 } update[i] = x;//当前层的最后一个小于score的节点 } /* we assume the key is not already inside, since we allow duplicated * scores, and the re-insertion of score and redis object should never * happen since the caller of zslInsert() should test in the hash table * if the element is already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) {//大于之前跳跃表的高度所以没有记录update[i],因为插入的节点有这么高所以要修改这些头结点的信息 for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length;//高出部分的头结点在还没插入当前节点时跨度应该是整张表,插入之后会重新更新这个值 } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ //rank[0]是x在第0层的上一个节点的实际排名,rank[i]是x在第i层的上一个节点的实际排名,它们俩的差值为x在第i层的上一个节点与x之间的距离 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x;//尾节点 zsl->length++; return x; }
假如往图2插入一个score为7的节点,则会按照下图方式所示进行:
找到插入位置(蓝色的线表示查找路径):
假设层数为3,将节点7插入进skiplist并修改附近节点的相关属性:
zskiplist里的删除操作void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj);//member的引用计数-1,防止内存泄漏 zfree(node); }
void zslFree(zskiplist *zsl) { //任何一个节点一定有level[0],所以迭代level[0]来删除所有节点 zskiplistNode *node = zsl->header->level[0].forward, *next; zfree(zsl->header); while(node) { next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl); }
主要分为以下3个步骤:
根据member(obj)和score找到节点的位置(代码里变量x即为该节点,update记录每层x的上一个节点)
调动zslDeleteNode把x节点从skiplist逻辑上删除
释放x节点内存。
/* Delete an element with matching score/object from the skiplist. */ //从skiplist逻辑上删除一个节点并释放该节点的内存 int zslDelete(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && compareStringObjects(x->level[i].forward->obj,obj) < 0))) x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->level[0].forward;//要删除的节点 if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x);//obj的引用计数-1并释放节点内存 return 1; } return 0; /* not found */ } /* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/ //从skiplist逻辑上删除一个节点(不释放内存,仅改变节点位置关系) //x为要删除的节点 //update为每一层x的上一个节点(为了更新x上一个节点的forward和span属性) void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) {//当前层有x节点 update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span; update[i]->level[i].forward = x->level[i].forward;//跨过x节点 } else {//当前层没有x节点 update[i]->level[i].span -= 1; } } if (x->level[0].forward) {//是否是tail节点 x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//删除了最高层数的节点 zsl->level--; zsl->length--; }
zslDeleteRangeByScore //删除 满足score范围 的节点
zslDeleteRangeByRank //删除 满足排名范围 的节点
zslDeleteRangeByLex //在所有节点的score相同的skiplist中,删除 满足member字典序范围 的节点
代码懒得贴了
范围查找操作 时间复杂度O(log(n)+m), m是范围内元素的个数根据查找范围的类型 zskiplist查找可以分为三类:
rank范围查找
score范围查找
member范围查找
功能:给定一个zero-based排名范围(start,end),从zskiplist中找出满足该范围的所有节点。
zrangeGenericCommand函数考虑了一些与客户端交互的内容,学zskiplist的时候没必要细看,它主要分为以下两步:
1.调用zslGetElementByRank找到排名start+1的节点·······O(logn)
2.从这个节点开始遍历(end-start+1)个节点·······O(m)
下面是zslGetElementByRank的代码:
/* Finds an element by its rank. The rank argument needs to be 1-based. */ //O(logn) zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { //从高层向底层累加span直到累加的值等于rank while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; x = x->level[i].forward; } if (traversed == rank) { return x; } } return NULL; }
功能:给定一个score的范围,从zskiplist中找出满足该score范围的所有节点。
为了方便的表示score范围的开闭区间,redis在server.h里声明了一个表示zset分数区间的类型zrangespec,关于score范围查找的相关函数都会用到它:
/* Struct to hold a inclusive/exclusive range spec by score comparison. */ typedef struct { double min, max; //是否是开闭区间,1为开,0位闭 int minex, maxex; /* are min or max exclusive? */ } zrangespec;
判断一个score与zrangespec区间内最小值、最大值的关系:
//给定value是否>(>=)此范围的下界 //gte(greater than or equal to) static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } //给定value是否<(<=)此范围的上界 //lte(less than or equal to) int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); }
根据上述两个函数,就可以用O(1)时间复杂度判断一个zskiplist的score是否与zrangespec分数区间有交集:
/* Returns if there is a part of the zset is in range. */ //用O(1)的时间复杂度判断zset(zsl)的分数范围是否与给定分数范围(range)有交集。 // //range(6,9] zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在范围内 // int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex))) return 0; x = zsl->tail; if (x == NULL || !zslValueGteMin(x->score,range))//尾节点小于范围下界 return 0; x = zsl->header->level[0].forward; if (x == NULL || !zslValueLteMax(x->score,range))//头节点大于范围上界 return 0; return 1;//在 }
genericZrangebyscoreCommand函数也考虑了很多与客户端交互的内容,就学习底层跳跃表实现时没必要细看,我们只需要知道底层是如何做到的即可,主要执行如下步骤:
1.调用zslParseRange把客户端传过来的范围min、max转换成zrangespec区间类型 保存在range变量里。
2.调用zslFirstInRange找到zskiplist中满足range条件的最小节点。(假设是正序的范围查找)·······O(logn)
3.从这个节点开始遍历,直到调用zslValueLteMax()找到最后一个小于range条件上界的节点。·······O(m)
放一下核心的代码zslFirstInRange:
/* Find the first node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ //满足range条件最小的那个 O(logn) zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; int i; /* If everything is out of range, return early. */ if (!zslIsInRange(zsl,range)) return NULL;//保证下面的逻辑一定能找到范围内的节点 x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* Go forward while *OUT* of range. */ while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range)) x = x->level[i].forward; } /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; serverAssert(x != NULL); /* Check if score <= max. */ if (!zslValueLteMax(x->score,range)) return NULL; return x; }
功能:在一个所有节点的score都相同的zskiplist中,找到满足member字符串字典序范围的所有节点。
底层用memcmp比较两个字符串的大小。实现的流程与genericZrangebyscoreCommand很像,有兴趣再看。
zslgetrank 根据member和score获得节点在该skiplist中的rank
zslParseRange 把客户端传过来的范围min、max转换成zrangespec区间类型 返回给range参数。
...
其实作者Antirez已经给出了答复:
https://news.ycombinator.com/item?id=1171423
划重点:They are not very memory intensive. It"s up to you basically.
既然取决于自己,skiplist实现简单就选它了。至于可能的好处和坏处大概整理了一下有这些:
缺点:
比红黑树占用更多的内存,每个节点的大小取决于该节点的层数
空间局部性较差导致缓存命中率低,感觉上会比红黑树更慢
优点:
实现比红黑树简单
比红黑树更容易扩展,作者之后实现zrank指令时没怎么改动代码。
红黑树插入删除时为了平衡高度需要旋转附近节点,高并发时需要锁。skiplist不需要考虑。
一般用zset的操作都是执行zrange之类的操作,取出一片连续的节点。这些操作的缓存命中率不会比红黑树低。
参考资料Skip Lists: A Probabilistic Alternative to Balanced Trees
Skip Lists: Done Right
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/28308.html
摘要:用不用作为分界是因为是的值,它用于判断是否到达尾部。这种类型的节点的大小介于与之间,但是为了表示整数,取出低四位之后会将其作为实际的值。当小于字节时,节点存为上图的第二种类型,高位为,后续位表示的长度。 // 文中引用的代码来源于Redis3.2 前言 Redis是基于内存的nosql,有些场景下为了节省内存redis会用时间换空间。ziplist就是很典型的例子。 介绍 ziplis...
阅读 2415·2021-10-08 10:04
阅读 2701·2021-09-06 15:02
阅读 704·2019-08-30 13:50
阅读 1498·2019-08-30 13:21
阅读 2550·2019-08-30 11:15
阅读 2079·2019-08-29 17:19
阅读 1540·2019-08-26 13:55
阅读 1232·2019-08-26 10:15