redis设计与实现——对象

redis设计与实现——对象

前面介绍了那么多数据结构,但redis并不是直接使用它们组成键值对,二是在上面封装了一层创建了一个对象系统。另外,redis的对象系统还实现了基于引用计数的内存回收机制和访问时间记录信息,从而能删除那些空转时长较大的key。

对象的类型与编码

redis使用对象来表示数据库中的key和value,redis的对象实现数据结构在src/server.h中:

1
2
3
4
5
6
7
8
9
10
11
12
#define LRU_BITS 24

// 使用bit field节省空间,https://www.geeksforgeeks.org/bit-fields-c/
typedef struct redisObject {
unsigned type:4; // 类型,4bit
unsigned encoding:4; // 编码,4bit
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount; // 引用计数
void *ptr; // 指向底层数据结构的指针
} robj;

类型

object的type字段用于记录对象的类型,分别是字符串、列表、哈希、集合和有序集合。对于redis保存的键值对来说,key总是字符串对象,而value则是上面所说的五种,可用TYPE命令获取对象类型。

1
2
3
4
5
#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */

编码和底层实现

对象的ptr指针指向了对象的底层数据结构,但这些数据结构是由对象的encoding属性决定。encoding的取值如下:

1
2
3
4
5
6
7
8
9
10
11
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

除了OBJ_LIST之外,其它每种类型的对象至少使用了两种不同的编码,使得redis可以根据不同的使用场景来为一个对象设置不同的编码从而优化使用效率。

类型 编码 对象
OBJ_STRING OBJ_ENCODING_INT 用整数值实现的字符串对象
OBJ_STRING OBJ_ENCODING_EMBSTR 用embstr编码sds的字符串对象
OBJ_STRING OBJ_ENCODING_RAW 使用sds实现的字符串对象
OBJ_LIST OBJ_ENCODING_QUICKLIST 使用quicklist实现的列表对象
OBJ_HASH OBJ_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象
OBJ_HASH OBJ_ENCODING_HT 使用字典实现的哈希对象
OBJ_SET OBJ_ENCODING_HT 使用哈希实现的集合对象
OBJ_SET OBJ_ENCODING_INSET 使用整数集合实现的集合对象
OBJ_ZSET OBJ_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象
OBJ_ZSET OBJ_ENCODING_SKIPLIST 使用跳表实现的有序集合对象

字符串对象

由上表可得,字符串的对象编码有三种:int, raw和embst。

当字符串是可以用long类型保存的整数,则转为long。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;

/* 确保是一个字符串对象 */
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

/* 使用某些特殊的编码方式编码raw和emstr */
if (!sdsEncodedObject(o)) return o;

/* 不对共享对象进行编码 */
if (o->refcount > 1) return o;

/* 编码字符串长度小于或等于20,且能够转换成long */
len = sdslen(s);
if (len <= 20 && string2l(s,len,&value)) {
/* 使用共享的整数数据,节省内存
* shared是server的共享数据,保存一些常用数据,
* 用户在使用这部分数据时不用新申请内存 */
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
if (o->encoding == OBJ_ENCODING_RAW) {
sdsfree(o->ptr);
o->encoding = OBJ_ENCODING_INT; // 用int编码
o->ptr = (void*) value;
return o;
} else if (o->encoding == OBJ_ENCODING_EMBSTR) {
decrRefCount(o);
return createStringObjectFromLongLongForValue(value);
}
}
}

/* 对于保存的字符串值长度小于44的进行embstr编码 */
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;

if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
emb = createEmbeddedStringObject(s,sdslen(s));
decrRefCount(o);
return emb;
}

/* We can't encode the object...
*
* Do the last try, and at least optimize the SDS string inside
* the string object to require little space, in case there
* is more than 10% of free space at the end of the SDS string.
*
* We do that only for relatively large strings as this branch
* is only entered if the length of the string is greater than
* OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
trimStringObjectIfNeeded(o);

/* Return the original object. */
return o;
}

如果字符串对象保存的字符串值小于或等于44,则用embstr编码的方式,否则用raw编码的方式。之所以选择44个字节,是因为使用了jemalloc,需要将embstr类型的字符串限定在64字节。而redis object占用了16个字节,当字符串长度小于44时sds会采用占用3字节的sdshdr8保存字符串,因此16+3+44=63,再加上字符串末尾的'\0',刚好是64。

1
2
3
4
5
6
7
8
9
10
11
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len);
else
return createRawStringObject(ptr,len);
}

robj *createRawStringObject(const char *ptr, size_t len) {
return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}

其中embstr编码是用来保存短字符串的一种优化的编码方式,虽然其跟raw一样都是采用redisobject结构和sdshdr结构来保存字符串对象,但embstr是调用一次内存分配函数来分配一块连续的空间(raw是调用两次,空间不连续)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
robj *createEmbeddedStringObject(const char *ptr, size_t len) {
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
struct sdshdr8 *sh = (void*)(o+1);

o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh+1;
o->refcount = 1;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}

sh->len = len;
sh->alloc = len;
sh->flags = SDS_TYPE_8;
if (ptr == SDS_NOINIT)
sh->buf[len] = '\0';
else if (ptr) {
memcpy(sh->buf,ptr,len);
sh->buf[len] = '\0';
} else {
memset(sh->buf,0,len+1);
}
return o;
}

通过将相对于raw两次的内存分配和释放次数降低到一次,并且保存了一块连续的内存空间,也很好地利用了缓存的优势。另外,该编码方式是创建一种unmodifiable string,redis不提供直接修改其的方法。要修改该字符串对象,只能先转为raw。

列表对象

在redis3.2.9之后,quicklist取代了ziplist和linkedlist,成为了列表对象的底层实现。创建一个新的列表对象:

1
2
3
4
5
6
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}

由于列表对象只有一种编码方式,因此只是简单调用了quicklistCreate()。

哈希对象

哈希对象有两种编码方式:ziplist或者hashtable。默认是ziplist

1
2
3
4
5
6
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}

为了探究其编码转换和插入生成哈希对象的方式,我们先来看hset命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void hsetCommand(client *c) {
int i, created = 0;
robj *o;

if ((c->argc % 2) == 1) {
addReplyError(c,"wrong number of arguments for HMSET");
return;
}
// 从db中查找或者创建一个哈希对象
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,c->argc-1);// 尝试转换编码

for (i = 2; i < c->argc; i += 2)
// 真正去添加新的键值对
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

/* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[0]->ptr;
if (cmdname[1] == 's' || cmdname[1] == 'S') {
/* HSET */
addReplyLongLong(c, created); // 通知客户端更改了多少个
} else {
/* HMSET */
addReply(c, shared.ok);
}
signalModifiedKey(c->db,c->argv[1]);// 通知数据变更
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);// 推送变更的订阅消息
server.dirty++;
}

首先来看hashTypeLookupWriteOrCreate。

1
2
3
4
5
6
7
8
9
10
11
12
13
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);// 从db中查找
if (o == NULL) {
o = createHashObject();// 不存在则新创建一个
dbAdd(c->db,key,o);
} else {
if (o->type != OBJ_HASH) {
addReply(c,shared.wrongtypeerr);
return NULL;
}
}
return o;
}

接着来看hashTypeTryConversion,通过检查ptr对应sds长度是否比hash_max_ziplist_value更大,则转换到哈希编码的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;

if (o->encoding != OBJ_ENCODING_ZIPLIST) return;

for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}

hashTypeSet的作用是往哈希对象添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;

if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;

zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1); // 从跳跃表中查找对应的field
if (fptr != NULL) {
/* 拿到value对应的指针为止,在field之后 */
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;

/* 删除当前的值 */
zl = ziplistDelete(zl, &vptr);

/* 插入新的值 */
zl = ziplistInsert(zl, vptr, (unsigned char*)value,
sdslen(value));
}
}

if (!update) {
/* 将field/value对插入到ziplist的尾部,其中filed在value的前面*/
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl;

/* 检查是否需要把ziplist编码转换为哈希编码,这是另一种转换编码的条件,如果哈希对象的键值对个数大于 512则需要转换编码*/
// #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
// 哈希编码的哈希对象其中每个键值对都是使用字典的键值对保存,并且key和value都是字符串对象
dictEntry *de = dictFind(o->ptr,field);
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
}

/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}

由此可见,当哈希对象同时满足以下两个条件才会使用ziplist编码:

  • 哈希对象保存的所有key/value的字符串长度都小于64个字节;
  • 哈希对象保存的键值对个数小于512个;

以上代码都在t_hash.c中。

集合对象

集合对象有两种编码方式:intset或者hashtable

以sadd命令对集合对象的编码方式做解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int isSdsRepresentableAsLongLong(sds s, long long *llval) {
return string2ll(s,sdslen(s),llval) ? C_OK : C_ERR;
}

robj *setTypeCreate(sds value) {
// 判断sds是否能用longlong表示
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject(); // 返回整数集合编码的对象
return createSetObject(); // 返回hash编码的对象
}

// 在src/object.c中实现
robj *createIntsetObject(void) {
intset *is = intsetNew(); // 底层数据结构 intset
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}

robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL); // 底层数据结构 字典
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}

void saddCommand(client *c) {
robj *set;
int j, added = 0;

set = lookupKeyWrite(c->db,c->argv[1]); // 寻找key对应对象
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr); // 新key则创建一个
dbAdd(c->db,c->argv[1],set);
} else {
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}

for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added; // add了多少次,添加到执行命令数量里
addReplyLongLong(c,added); 返回结果给客户端
}

而添加新的值的方式,则是通过调用setTypeAdd()实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* 如果集合对象的个数太多(默认是多于512),则转为哈希编码 */
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* 无法转为整数,则使用哈希编码 */
setTypeConvert(subject,OBJ_ENCODING_HT);

/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); // 添加新的数据到字典
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}

由此可见,当集合对象的个数大于server.set_max_intset_entries(默认为512)或者集合对象保存了非整数值的元素,则需要使用哈希编码。否则可以用整数集合编码。

有序集合对象

有序集合zset有两种编码方式,一种是ziplist,另一种就是skiplist。

我们通过zadd命令来看,这两种编码的使用和转换方法,在src/t_zset.c实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
/* 前面有一系列参数的初始化,包括对客户端的响应,添加的参数统计等等
* 比如初始化 elements = c->argc-scoreidx; elements /= 2; */

/* 解析所有的score,保证事务,要么全部完成,要么就什么都不做 */
scores = zmalloc(sizeof(double)*elements); // 初始化分数值
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) // 把所有传递进的分值放到scores里
!= C_OK) goto cleanup;
}
/* 查找key是否存在 */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
// zset_max_ziplist_entries设置为0或者长度大于zset_max_ziplist_value(默认为64)
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
// 创建zset对象,该zset对象使用skiplist编码
zobj = createZsetObject();
} else {
// 创建ziplist编码的zset对象
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}

for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;

ele = c->argv[scoreidx+1+j*2]->ptr;
// 往有序列表插入或者更新一个新的元素
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);

reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}

cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}

添加或者更新元素的主要实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;

/* 检查分值是否为nan*/
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}

/* 更具编码区更新有序列表,压缩列表 */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
// 元素存在
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}

/* 增加score */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* 删除后重新插入 */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* 优化: 检查元素是否太大,或者有序列表太长,如果满足了则进行转换 */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
// 有序列表长度超过zset_max_ziplist_entries(默认64)
// 元素的字符串长度超过zset_max_ziplist_entries(默认128)
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;

de = dictFind(zs->dict,ele);
if (de != NULL) {
/* 元素已经存在 */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changes. */
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* 并没有移除原来的元素,而是更新表示哈希表的字典 */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);//往跳跃表添加一个元素
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); // 往哈希表添加一个元素
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}

由此可见,在创建skiplist编码的有序集合时,会创建一个zset对象。该zset包含一个字节和一个跳跃表。但两者只会存储一份数据,hashTable和skiplist共享元素的成员和分值。这样就可以保证在执行ZSCORE命令时,通过哈希表可以在O(1)的时间获取结果,而执行ZRANK,ZRANGE这些则可以用skiplist更快得到范围结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;

zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}

类型检查与命令多态

类型检查的实现

为了确保指定类型的键才可以执行某些特定的命令,在执行命令之前会先检查输入键的类型正确与否。

例如当我们使用LLEN命令,其会去检查操作对象是否为一个列表键。

1
2
3
4
5
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return; // 检查类型是否LIST
addReplyLongLong(c,listTypeLength(o));
}

有些命令还需要检查对象的编码方式,然后根据不同的编码调用不同的函数。这就是命令多态的来源。

内存回收

C语言不能自动做垃圾回收,因此redis构造了一个引用计数的技术来做内存回收。即redisobject1中refcount字段。

  • 当创建新对象时,引用计数为1;
  • 当对象被一个新程序使用时,引用计数+1;
  • 当对象不再被一个程序使用时,引用计数-1;
  • 当对象的引用计数为0,对象所占用的内存被释放;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 递增引用计数
#define OBJ_SHARED_REFCOUNT INT_MAX
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}

// 递减引用计数
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}

// 将引用计数设为0,但不释放对象。通常用于传递对象到一个新的函数里
// 例如:functionThatWillIncrementRefCount(resetRefCount(CreateObject(...)));
robj *resetRefCount(robj *obj) {
obj->refcount = 0;
return obj;
}

对象共享

为了节省内存,redis会创建一些特殊对象用于全局共享。例如redis会创建10000个字符串对象,包含了从0到9999的所有整数值。那么当服务器要用到这些对象时,会直接取出共享对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
void trimStringObjectIfNeeded(robj *o) {
// ....
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
decrRefCount(o);//销毁原字符串对象
incrRefCount(shared.integers[value]);//共享对象引用计数+1
return shared.integers[value];//返回共享对象
}
// ...
}

在server.c中预先创建10000个对象。

1
2
3
4
5
6
7
8
9
10
#define OBJ_SHARED_INTEGERS 10000
void createSharedObjects(void) {
//....
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
shared.integers[j]->encoding = OBJ_ENCODING_INT;
}
//...
}

因此对于这些共享对象,服务器会默认持有一个引用计数。

对象的空转时长

这部分特性通过redisobject的lru属性实现,该字段记录了最后一次被命令程序访问的时间。

使用OBJECT命令可以访问key对象,但不会修改其的lru属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void objectCommand(client *c) {
robj *o;

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
"ENCODING <key> -- Return the kind of internal representation used in order to store the value associated with a key.",
"FREQ <key> -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.",
"IDLETIME <key> -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.",
"REFCOUNT <key> -- Return the number of references of the value associated with the specified key.",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
} else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
/* LFUDecrAndReturn should be called
* in case of the key has not been accessed for a long time,
* because we update the access time only
* when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o));
} else {
addReplySubcommandSyntaxError(c);
}
}

robj *objectCommandLookup(client *c, robj *key) {
dictEntry *de;
// 直接到db去查找key
if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
return (robj*) dictGetVal(de);
}

通过源码可见,这个命令在访问key对象时,不会修改对象的lru属性,因为时直接到db去查找状态的。

而更新lru字段的处理需要经过db.c。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 底层级的查找
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);

// 当存在rdb和aof子进程运行时,不进行lru更新,避免不断地写副本
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
// 使用lfu策略
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK(); // 更新lru时间
}
}
return val;
} else {
return NULL;
}
}