redis设计与实现——RDB持久化

redis设计与实现——RDB持久化

由于Redis是内存数据库,在服务器进程退出时,服务器状态也会丢失不见,因此Redis提供了RDB持久化功能,可以帮助把内存中的数据库状态保存到磁盘里面,避免数据丢失。

RDB持久化既可以手动执行,也可以服务器配置定期执行,执行后会生成一个经过压缩的二进制RDB文件。

RDB文件的创建与载入

有两个Redis命令可以生成RDB文件——SAVE和BGSAVE,前者会阻塞Redis服务器进程,直到创建完RDB文件,后者则是fork出一个子进程来负责创建RDB文件。

在redis/src/rdb.c中存在实际创建RDB文件的函数rdbSave(),SAVE命令和BGSAVE命令都会以不同方式调用这个函数。

SAVE命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) { // 正在执行BGSAVE
addReplyError(c,"Background save already in progress");
return;
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
// 调用rdbSave保存文件
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
}
}

redis 的事件循环中会去检测redisServer的saveparams字段,判断是否执行BGSAVE,在执行完之后,子进程调用_exit()退出,避免因为父进程正在对文件进行操作而子进程直接回写文件缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

start = ustime();
if ((childpid = fork()) == 0) { // Fork一个子进程
int retval;

/* Child */
closeListeningSockets(0); // 关闭子进程的监听
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi); // 调用rdbSave
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1); // 调用_exit(retcode);
} else {
/* Parent */
// 父进程会记录一些BGSAVE状态
}
return C_OK; /* unreached */
}

无论是SAVE还是BGSAVE,最终都需要调用rdbSave完成工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;

// 创建一个临时的rdb文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

// 初始化 static const rio rioFileIO
rioInitWithFile(&rdb,fp);

if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

// 保存rdb文件
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* 使用原子性的重命名操作 */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0; // 重设dirty属性
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

由此可见,rdbSave的操作主要分为两步:

  • 先将数据写到一个临时文件——tmp-%d.rdb;
  • 调用原子性的重命名操作;

自动间隔保存

对于BGSAVE命令,Redis支持用户可以通过制定配置文件或者传入启动参数的方式设置save选项。

  • save 900 1:服务器在900秒之内,对数据库进行了至少一次修改;

redis支持多RDB配置,满足任意一个就可以触发BGSAVE。在redisServer结构体中,存在serverparams字段记录了save条件。该字段结构有两个field:时间和修改次数。

1
2
3
4
5
6
7
8
9
10
11
12
struct saveparam {
time_t seconds;
int changes;
};

struct redisServer {
// ...
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
// ...
}

Redis的服务器周期性操作函数serverCron默认每100ms执行一次,其中一项工作就是检查save选项设置的保存条件是否满足。除了需要检查是否满足在规定时间内操作数据库的次数,还要检查上一次bgsave是否成功,如果不成功的话,需要等待CONFIG_BGSAVE_RETRY_DELAY秒,默认是5秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) {
// 检查是否只在bgsave或者存在aof子进程
} else {
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;

// 检查多个触发条件
// 是否满足操作数和时间
// 上一次bgsave是否成功,如果不成功要等待CONFIG_BGSAVE_RETRY_DELAY秒, #define CONFIG_BGSAVE_RETRY_DELAY 5
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);// 调用bdsave
break;
}
}
}
// ...
}

RDB文件结构

本节主要介绍RDB的文件结构,具体的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10]; // 标识rdb文件
int j;
uint64_t cksum; // 校验和
size_t processed = 0;

if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
// RDB文件的开头,5字节的REDIS和四字节的RDB文件版本
// #define RDB_VERSION 9,当前是9
// 当格式更改不再兼容后向时,此数字将递增
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
// 遍历数据库,dump数据
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict; // 获取所有的键值对
if (dictSize(d) == 0) continue; // 保存非空数据库
di = dictGetSafeIterator(d);

// #define RDB_OPCODE_SELECTDB 254。保存一字节长,表示接下来会读入一个数据库号码,该号码可以使得服务器调用SELECT命令切换数据库
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;

// 写入一个 RESIZE DB 操作码,#define RDB_OPCODE_RESIZEDB 251
// 该数字只是一个重建哈希表的大小参考,不限制实际读取
// 接下来会写入键值对个数
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

/* 遍历该db,写入所有键值对*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;

initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
// 写入expire time, type, key, value
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;


if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

// ....其它操作

/* 写入EOF #define RDB_OPCODE_EOF 255 /*
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
// 写入校验和
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;

werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}

因此RDB的文件结构可以总结为五个部分:

REDIS db_version Databases EOF check_sum
REDIS 0009 kv内容 255 8字节无符号整数

其中DataBase部分会保存多个非空数据库,总结可以分为三个部分,1字节长的标示码,整数的db序列号和键值对

RDB_OPCODE_SELECTDB | db_number | kv对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

/* 带有过期时间的键值对保存 */
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}

/* 保存LRU信息 */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}

/* 保存LFU信息 */
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);

if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}

/* 保存type,和键值对 */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;

/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
usleep(server.rdb_key_save_delay);

return 1;
}

键值对的保存结构是通过函数rdbSaveKeyValuePair()实现的,并且带有过期时间的键值对和不带有的都混在一起保存。其中如果有过期时间,则通过开头的RDB_OPCODE_EXPIRETIME_MS进行标示。至于保存类型则有其中,都是1字节长。key都是字符串对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2
#define RDB_TYPE_ZSET 3
#define RDB_TYPE_HASH 4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7
/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
#define RDB_TYPE_STREAM_LISTPACKS 15

结构就是:

RDB_OPCODE_EXPIRETIME_MS | ms | TYPE | key | value

或者:

TYPE | key | value