redis设计与实现——quicklist

quicklist

概述

A doubly linked list of ziplists

根据quicklist.c的注释,这种数据结构是一个以ziplist为节点的双向链表。在redis3.2之后,quicklist取代了压缩列表和linkedlist,成为了列表对象的唯一编码形式。commit 记录

之所以这样设计,是因为原先的linkedlist由于各个节点都是单独的内存,很容易造成内存碎片;而对于压缩列表,由于其每次修改都会引发内存的重新分配,导致大量的内存拷贝。经过对时间和空间的折中,选择了quicklist这种方法。

数据结构

首先来看节点,

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct quicklistNode {
struct quicklistNode *prev; // 前一个节点
struct quicklistNode *next; // 后一个节点
unsigned char *zl; // ziplist结构,压缩的ziplist会指向一个quicklistLZF结构
unsigned int sz; /* ziplist的大小 */
unsigned int count : 16; /* ziplist的item个数*/
unsigned int encoding : 2; /* ziplist是否压缩,1没有压缩,2压缩*/
unsigned int container : 2; /* 目前固定为2,表示使用ziplist作为数据容器 */
unsigned int recompress : 1; /* 是否压缩,1表示压缩。有些命令需要做解压,因此用该标记以便后续压缩*/
unsigned int attempted_compress : 1; /* 暂时不用管,自动测试用的 */
unsigned int extra : 10; /* 扩展字段,目前还没被使用,刚好凑成32bit */
} quicklistNode;

然后quicklist这个结构体将上面节点表示连起来:

1
2
3
4
5
6
7
8
typedef struct quicklist {
quicklistNode *head; // 头部节点
quicklistNode *tail; // 尾部节点
unsigned long count; /* ziplist的item个数总和 */
unsigned long len; /* 节点个数 */
int fill : 16; /* 单个ziplist的大小设置 */
unsigned int compress : 16; /* 节点的压缩设置 */
} quicklist;

fill的设置与单个quicklistNode的大小有关,当该值为正数时,表示节点指向的ziplist的数据项个数,因此16bit可以最多表示32k的个数;当该值为负数时,表示单个节点最多存储大小。(-1:4kb, -2:8kb, -3:16kb, -4:32kb, -5:64kb)。默认是-2,8kb。

1
2
3
4
5
6
7
8
9
#define FILL_MAX (1 << 15)
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
} else if (fill < -5) {
fill = -5;
}
quicklist->fill = fill;
}

创建

1
2
3
4
5
6
7
8
9
10
11
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;

quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL; // 不包含多余的头部节点
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2; // 默认的fill是-2,8kb
return quicklist;
}

插入push

push操作是通过quicklistpush实现的:

1
2
3
4
5
6
7
8
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
quicklistPushHead(quicklist, value, sz); // push头部
} else if (where == QUICKLIST_TAIL) {
quicklistPushTail(quicklist, value, sz); // push尾部
}
}

插入头部的函数,返回0表示已经存在头部,返回1表示创建了新的头部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#if __GNUC__ >= 3
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
// 判断ziplist大小是否超过限制
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 插数据到头部节点
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();//新建一个节点
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);// 新建一个ziplist并插入一个节点

quicklistNodeUpdateSz(node); // 更新节点的sz
_quicklistInsertNodeBefore(quicklist, quicklist->head, node); // 将该节点插入到原头部节点之前
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

检查ziplist的大小是否满足要求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)

REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;

int ziplist_overhead;
/* size of previous offset */
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;

/* size of forward offset */
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;

/* 忽略sz被编码成整数的情况*/
unsigned int new_sz = node->sz + sz + ziplist_overhead;
// 检查fill为负数时,是否超过容量大小
// 五种情况:static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz)) // fill为正数时,不能超过#define SIZE_SAFETY_LIMIT 8192,8kb
return 0;
else if ((int)node->count < fill) // fill为正数时,检查节点的ziplist项数
return 1;
else
return 0;
}

节点压缩

前面提到过,如果当前的节点需要进行压缩,zl数据指针将指向quicklistLZF结构体。

1
2
3
4
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes, compressed的长度*/
char compressed[]
} quicklistLZF;

具体的压缩操作函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Compress the ziplist in 'node' and update encoding details.
* Returns 1 if ziplist compressed successfully.
* Returns 0 if compression failed or if ziplist too small to compress. */
REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
#ifdef REDIS_TEST
node->attempted_compress = 1;
#endif

/* 小于 #define MIN_COMPRESS_BYTES 48 不进行压缩 */
if (node->sz < MIN_COMPRESS_BYTES)
return 0;

quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz); // 分配内存

/* 如果压缩失败,或者压缩尺寸不够,节省的空间不足8字节则取消 */
// #define MIN_COMPRESS_IMPROVE 8
if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed,
node->sz)) == 0) ||
lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) {
/* lzf_compress aborts/rejects compression if value not compressable. */
zfree(lzf);
return 0;
}
lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz); // 重新分配内存
zfree(node->zl); // 释放原来的节点
node->zl = (unsigned char *)lzf;
node->encoding = QUICKLIST_NODE_ENCODING_LZF;
node->recompress = 0;
return 1;
}

具体lzf压缩算法,可以参考lzf_compress函数