redis设计与实现——整数集合

redis设计与实现——整数集合

整数集合intset是集合键的底层实现之一,当一个集合只包含整数元素并且元素数量不多时,redis就会用intset作为集合键的底层实现。

整数集合的实现

intset的数据结构在intset.h/c的表示方式如下:

1
2
3
4
5
typedef struct intset {
uint32_t encoding; // 编码方式
uint32_t length; // 集合中元素数量
int8_t contents[]; // 保存元素的数组
} intset;

contents保存的就是intset中各个元素,虽然其声明为int8_t,但实际上其保存的类型取决于encoding的值。

encoding的可能选项有三种:

1
2
3
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

另外,encoding的类型是由contents中最大的一个数决定的。contents数组则按小到大保存着所有元素。

创建空的intset时默认为int16:

1
2
3
4
5
6
7
/* Create an empty intset. */
intset *intsetNew(void) {
intset *is = zmalloc(sizeof(intset));
is->encoding = intrev32ifbe(INTSET_ENC_INT16);// 大小端转换
is->length = 0;
return is;
}

由于contents是一段连续的内存,并存储超过了一个字节,元素也是按照大小排序,因此需要考虑系统的大小端问题。redis都是按照小端来使用,在/src/endianconv.h中有一段相关的宏定义:

1
2
3
4
5
6
7
8
9
10
11
/* variants of the function doing the actual conversion only if the target
* host is big endian */
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define intrev16ifbe(v) (v)
#define intrev32ifbe(v) (v)
#define intrev64ifbe(v) (v)
#else
#define intrev16ifbe(v) intrev16(v)
#define intrev32ifbe(v) intrev32(v)
#define intrev64ifbe(v) intrev64(v)
#endif

升级

每当redis要添加一个新元素到整数集合时,并且新元素的类型比当前整数集合的encoding要更长时,就需要先进行升级。

因此在这种情况下,添加一个新元素的步骤就是:升级、查找和插入。

首先要判断插入元素的长度:

1
2
3
4
5
6
7
8
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX)
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16;
}

以下就是新元素插入的主题函数过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);// 获取新元素的长度
uint32_t pos;
if (success) *success = 1;

/* 因为引发升级的新元素长度总是比intset中现存所有元素的长度都大,因此其要么
* 大于所有元素,要么小于所有元素;
* 因此新元素只需放在底层数组开头或者末尾即可 */
if (valenc > intrev32ifbe(is->encoding)) {
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value); // 升级并插入元素
} else {
/* 如果集合中寻在该元素,则返回
* 如果不存在元素,pos将存着将要被插入的准确位置索引 */
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}

// 多申请一个空间
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 如果没找到相应的pos(即小于value的最大整数所在位置)
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); // 迁移内存,腾出空间给新的数据
}

_intsetSet(is,pos,value); // 在pos插入元素
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

由上可见,在不进行升级的情况,需要先找到对应的pos,即intset中小于value的最大元素,通过二分法查找:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
static int64_t _intsetGet(intset *is, int pos) {
return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}

static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
int64_t v64;
int32_t v32;
int16_t v16;

if (enc == INTSET_ENC_INT64) {
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
memrev64ifbe(&v64);
return v64;
} else if (enc == INTSET_ENC_INT32) {
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
memrev32ifbe(&v32);
return v32;
} else {
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
memrev16ifbe(&v16);
return v16;
}
}

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;

/* 空的intset,直接返回,pos设为0 */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* 如果value大于intset的最大值,则pos设为intset的长度
* 如果value小于intset的最小值,将pos赋值为0
* 此举为是否移动内存元素的判断提供帮助 */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}

// 二分查找
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}


if (value == cur) {
if (pos) *pos = mid; // 找到对应位置
return 1;
} else {
if (pos) *pos = min; // 找不到
return 0;
}
}

前面的判断语句,在pos小于当前长度的时候,需要将pos后面的元素都往后移动一个位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from;
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
/* 如果目标区域和源区域有重叠的话,memmove() 能够保证源串在被覆盖之前将重叠 * 区域的字节拷贝到目标区域中,复制后源区域的内容会被更改 */
memmove(dst,src,bytes);
}

至于具体的升级操作,则由intsetUpgradeAndAdd完成,包含了encoding升级和插入元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0; // 简单通过正负判断插在开头还是尾部

/* 设置新的encoding和扩展空间 */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

/* 从尾部开始对所有数据进行迁移,重新分配空间 */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

/* 在头部或者尾部设置新的value*/
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

总结

  1. 整数集合是集合键的底层实现之一;
  2. redis能够根据新加元素的类型,改变整个数组的类型;
  3. 整数集合只支持升级,不支持降级操作;