redis设计与实现——压缩列表

redis设计与实现——压缩列表

ziplist压缩列表是有序集合键(另一个是跳表)和哈希键(另一个是字典)的底层实现之一(在3.2之后不再是list的底层实现,被quicklist取代了)。如果一个列表键只包含少量的项,并且每个列表项要么是小整数值,要么是短的字符串,那么Redis就会用ziplist表示列表键。

另外,当一个哈希键只包含少量键值对,并且每个key和value要么是小整数值,要么是短的字符串,那么Redis就会用ziplist表示哈希键。

压缩列表的构成

压缩列表并没有使用一个数据结构去表示。为了节省内存,ziplist是通过特殊编码的连续内存块组成的顺序型结构。我们可以通过其创建步骤来看其结构内容,一个压缩列表包含了多个节点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* ziplist头: 2个32位的整数存总共字节数,1个16位的整数存item */
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
/* 1个字节表示ziplist的最后一个item的size */
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
/* 获取ziplist的zlbytes指针 */
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
/* 获取ziplist的zltail指针 */
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
/* 获取zllen指针 */
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
/* 特殊值,0xFF用来标记压缩列表的末端 */
#define ZIP_END 255

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
zl[bytes-1] = ZIP_END;
return zl;
}

因此压缩列表的结构可以表示为(小端序):

1
2
3
4
5
6
/*
* <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
* [0f 00 00 00] [0c 00 00 00] [02 00] [00 f3] [02 f6] [ff]
* | | | | | |
* zlbytes zltail zllen "2" "5" end
*/

从这个例子中可以看到:

  • zlbytes为0x0f,表示压缩列表总长度为15;
  • zltail为0x0c,表示如果我们有一个指向压缩列表起始地址的指针p,那么只要指针p加上偏移12,则可以得到entry2的地址;
  • zllen为0x02,表示有2个列表节点;
  • zlend是特殊值0xFF,即255,用来标记压缩列表末端;

压缩列表节点的组成

Redis使用了一个结构来表示压缩列表的节点,这个结构体并不是真正的编码方式,只是用来做内部函数操作(主要是使用zipEntry函数根据p指针返回一个zlentry),另外还使用了一个函数来创建节点,压缩列表的节点可以保存一个字节数组或者是一个整数值。

1
2
3
4
5
6
7
8
9
typedef struct zlentry {
unsigned int prevrawlensize; /* prevrawlen的字节数,1或者5,即如果prevrawlen小于254,其就是1*/
unsigned int prevrawlen; /* 前一个节点长度 */
unsigned int lensize; /* 编码len的所需字节大小*/
unsigned int len; /* 当前节点长度 */
unsigned int headersize; /* header大小 = prevrawlensize + lensize. */
unsigned char encoding; /* 节点的编码方式:ZIP_STR_* or ZIP_INT_* */
unsigned char *p; /* 指向节点的指针 */
} zlentry;
1
2
3
4
5
6
7
8
// 根据节点指针p返回一个zlentry
void zipEntry(unsigned char *p, zlentry *e) {

ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
e->headersize = e->prevrawlensize + e->lensize;
e->p = p;
}

为了节省内存,redis的压缩列表使用了节点的encoding记录了节点所保存的数据类型和长度。以下是不同的编码方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0 // 字节数组的编码不会以11开头
#define ZIP_INT_MASK 0x30
#define ZIP_STR_06B (0 << 6) // 小于63的字节数组
#define ZIP_STR_14B (1 << 6) // 小于2^14-1字节的字节数组
#define ZIP_STR_32B (2 << 6) // 小于2^32-1字节的字节数组
#define ZIP_INT_16B (0xc0 | 0<<4) // 11000000,int16_t的整数
#define ZIP_INT_32B (0xc0 | 1<<4) // 11010000,int32_t的整数
#define ZIP_INT_64B (0xc0 | 2<<4) // 11100000,int64_t的整数
#define ZIP_INT_24B (0xc0 | 3<<4) // 11110000,24位有符号整数
#define ZIP_INT_8B 0xfe // 11111110,8位有符号整数

/* 1111xxxx,使用xxxx来保存一个介于0-12的值 */
#define ZIP_INT_IMM_MASK 0x0f
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001,直接编码存储的最小值 */
#define ZIP_INT_IMM_MAX 0xfd /* 11111101,直接编码存储的最大值*/

可以看到,压缩列表的节点保存了整数和字符数组两种类型,并针对不同的长度做了不同的编码解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 检查字符串是否可以被编码成整数
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
long long value;

if (entrylen >= 32 || entrylen == 0) return 0;
// 尝试将entry转换为long long
if (string2ll((char*)entry,entrylen,&value)) {
/* Great, the string can be encoded. Check what's the smallest
* of our encoding types that can hold this value. */
if (value >= 0 && value <= 12) {
*encoding = ZIP_INT_IMM_MIN+value; // 直接将value保存在encoding后四位
} else if (value >= INT8_MIN && value <= INT8_MAX) {
*encoding = ZIP_INT_8B;
} else if (value >= INT16_MIN && value <= INT16_MAX) {
*encoding = ZIP_INT_16B;
} else if (value >= INT24_MIN && value <= INT24_MAX) {
*encoding = ZIP_INT_24B;
} else if (value >= INT32_MIN && value <= INT32_MAX) {
*encoding = ZIP_INT_32B;
} else {
*encoding = ZIP_INT_64B;
}
*v = value;
return 1;
}
return 0;
}

unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
unsigned char len = 1, buf[5];

// #define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
if (ZIP_IS_STR(encoding)) {
/* Although encoding is given it may not be set for strings,
* so we determine it here using the raw length. */
if (rawlen <= 0x3f) { // 如果数组长度小于64,用一个字节进行存储
if (!p) return len; // p为null时,只获取len
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) { // 同理,用两个字节存储
len += 1;
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
buf[1] = rawlen & 0xff;
} else { // 用五个字节存储
len += 4;
if (!p) return len;
buf[0] = ZIP_STR_32B;
buf[1] = (rawlen >> 24) & 0xff;
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
/* Implies integer encoding, so length is always 1. */
if (!p) return len;
buf[0] = encoding;
}

/* 将编码存储到p指针中 */
memcpy(p,buf,len);
return len;
}

因此其编码模式总结就是:

  • 字节数组编码
编码 编码长度
00bbbbbb 1字节 小于64字节的字节数组
01bbbbbb xxxxxxxx 2字节 小于16383字节的字节数组
10______ xxxxxxxx
xxxxxxxx xxxxxxxx xxxxxxxx
5字节 小于等于2^32-1的字节数组,有6个bit留空
  • 整数编码
编码 编码长度
1111xxxx 1字节 在[0,12]区间的整数
11111110 1字节 8bit有符号整数
11110000 1字节 24bit有符号整数
11100000 1字节 int64_t
11010000 1字节 int32_t
11000000 1字节 int16_t

content

虽然redis使用了zlentry作为内部节点的数据结构,但其真实编码表示并不是按照该结构体来计算的。redis对字节数组或者整数的编码方式可以参考节点插入的过程来解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define ZIPLIST_HEAD 0
#define ZIPLIST_TAIL 1

#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))

#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p;
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
return __ziplistInsert(zl,p,s,slen); // 将长度为slen的s插入p所在位置
}

节点的插入是通过上面这个函数实现的,分为从头部和尾部进行插入。至于具体的entry表示方式则要看插入节点的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/* Insert item at "p". */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; // 存储当前的总长和将要存储的列表长度
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;

/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) { // 如果p不是列表尾部
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);// 计算prevlensize和prevlen
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);// 获取最后一个节点
if (ptail[0] != ZIP_END) { // 列表不为空
prevlen = zipRawEntryLength(ptail); // 解码,获取最后一个节点的长度,其实就是将要插入节点的上一个节点长度
}
}

/* 尝试编码value */
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
reqlen = zipIntSize(encoding); // 根据encoding获取长度
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
reqlen = slen; // 直接使用字符数组长度
}
/* We need space for both the length of the previous entry and
* the length of the payload. */
reqlen += zipStorePrevEntryLength(NULL,prevlen); // 计算编码prevlen需要的长度
reqlen += zipStoreEntryEncoding(NULL,encoding,slen); // 计算编码encoding需要的长度

/* 如果不是往尾部插入,则需要判断当前prevlen长度是否足够
* 由于我们是用prevlen来存储上一个节点的长度,即prevlen在1或者5个字节间选择 * 因此需要考虑到前一个节点的插入影响了原先的prelem编码长度 */
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}

/* 由于重新分配内存,因此需要记录计算偏移 */
offset = p-zl; // 记录原偏移
zl = ziplistResize(zl,curlen+reqlen+nextdiff); // 重新分配内存
p = zl+offset; // 根据偏移获取p指针

/* Apply memory move when necessary and update tail offset. */
if (p[0] != ZIP_END) {
/* 如果不是在尾部插入,则需要把数据整体往后挪*/
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

/* Encode this entry's raw length in the next entry. */
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

/* 更新tail值 */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* 在尾部插入则直接更新tail_offset */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}

/* 进行级联更新*/
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}

/* Write the entry */
p += zipStorePrevEntryLength(p,prevlen);// 记录prevlen
p += zipStoreEntryEncoding(p,encoding,slen); // 记录encoding
if (ZIP_IS_STR(encoding)) { // 记录字符数组
memcpy(p,s,slen);
} else { // 记录整数
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1); // ziplist的len加1
return zl;
}

因此一个节点的完整编码结构包含了prevlen,encoding和content三个部分,下图就是一个保存着整数值10086的编码结构。

img

级联更新

考虑这样的一种情况,如果目前所有节点的长度都在250-253字节之间,那么意味着记录这些节点只需要1字节长的prevlen。但此时如果将一个长度大于或等于254字节的新节点设置为ziplist的头部节点,那么将直接影响后续所有节点的prevlen编码长度。

Redis将会因此触发级联更新,即遍历所有需要更新的节点进行处理,直到不需要更新为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;

while (p[0] != ZIP_END) {
zipEntry(p, &cur); // 根据p重建zlentry
rawlen = cur.headersize + cur.len; // 当前长度
rawlensize = zipStorePrevEntryLength(NULL,rawlen);

/* Abort if there is no next entry. */
if (p[rawlen] == ZIP_END) break;
zipEntry(p+rawlen, &next);

/* Abort when "prevlen" has not changed. */
if (next.prevrawlen == rawlen) break;

if (next.prevrawlensize < rawlensize) {
/* 只有需要扩展的才能引发连锁更新*/
offset = p-zl;
extra = rawlensize-next.prevrawlensize;
zl = ziplistResize(zl,curlen+extra);
p = zl+offset;

/* Current pointer and offset for next element. */
np = p+rawlen;
noffset = np-zl;

/* Update tail offset when next element is not the tail element. */
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}

/* Move the tail to the back. */
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipStorePrevEntryLength(np,rawlen);

/* Advance the cursor */
p += rawlen;
curlen += extra;
} else {
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}

/* Stop here, as the raw length of "next" has not changed. */
break;
}
}
return zl;
}

总结

  1. 压缩列表是列表键和哈希键的底层实现之一;
  2. 列表中包含多个节点,每个节点都可以包含一个字节数组或者整数;
  3. 添加或者删除节点都可以能引发连锁的更新操作;