在redis中有一個有序列表,它的底層是由壓縮列表或跳表組成。我們看下對應的數據結構
壓縮鍊表:
跳表:
下載下來4.0的源碼 https://download.redis.io/releases/redis-4.0.0.tar.gz
對應的源碼:
src/server.h
# 最大層級
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
# 表示上一層級是下一層級的1/4,相當於是一棵四叉樹
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
typedef struct zskiplist {
// 頭節點,尾節點
struct zskiplistNode *header, *tail;
//長度
unsigned long length;
//當前索引的層數
int level;
} zskiplist;
typedef struct zskiplistNode {
// 動態字符串,member 對象
sds ele;
// 分值表示順序
double score;
// 後退指針
struct zskiplistNode *backward;
//索引層
struct zskiplistLevel {
// 前進指針指向node
struct zskiplistNode *forward;
// 這個層跨越的節點數量
unsigned int span;
} level[];
} zskiplistNode;
#src/t_zset源碼
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
我們看下zadd的源碼:
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
# 解析參數
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}
#參數轉換
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
# 參數必須成對
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReply(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
# 遍歷所有要添加的元素,score必須是數字
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
# zset不存在就加鎖創建
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
#選擇存儲結構
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
# 跳表
zobj = createZsetObject();
} else {
# 壓縮列表
zobj = createZsetZiplistObject();
}
# 將創建好的數據結構添加到db中
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;
# 從參數中獲取元素member
# ZADD key score member [[score member] [score member] ...]
ele = c->argv[scoreidx+1+j*2]->ptr;
# 添加邏輯具體看zsetAdd
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
# 添加失敗就清空
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}
void zincrbyCommand(client *c) {
zaddGenericCommand(c,ZADD_INCR);
}
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
# 存在的時候,刪除並替換
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
#在滿足zset_max_ziplist_entries 和zset_max_ziplist_value的時候,轉化成跳表
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
# 存在的時候,刪除並替換
de = dictFind(zs->dict,ele);
if (de != NULL) {
if (score != curscore) {
zskiplistNode *node;
serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
znode = zslInsert(zs->zsl,score,node->ele);
node->ele = NULL;
zslFreeNode(node);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
# 將元素轉為sds
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}在redis的官方文檔中 https://raw.githubusercontent.com/redis/redis/4.0/redis.conf 有個配置參數:
# Similarly to hashes and lists, sorted sets are also specially encoded in
# order to save a lot of space. This encoding is only used when the length and
# elements of a sorted set are below the following limits:
zset-max-ziplist-entries 128
zset-max-ziplist-value 64簡單的梳理下流程:
通過以上資料,我們可以推斷下:
在小數據量的時候,壓縮列表的性能不會比跳表差太多;
通過數據結構的對比,壓縮列表在有序性上的性能損耗相對較大,插入數據需要將數據後移;
跳表對數據的插入相對比較友好,直接修改指針就行;
跳表需要注意多級索引的退化,需要在合適的時機修改或重構索引;
zset的應用場景滑動窗口限流如:生產上要對外部調用系統進行限流,每分鐘只能有1000的調用量
使用命令:
延遲隊列使用命令:
排行榜zadd ranking_list 1 a 初始化
ZINCRBY ranking_list 1 a 瀏覽量+1
ZREVRANGE ranking_list 0 9 按score從大到小取10條