redis设计与实现——AOF持久化

AOF持久化

RDB持久化是通过保存数据库中的键值对来记录数据库状态的,而AOF则是通过保存redis服务器所执行的命令来完成记录的。

例如执行命令:

1
2
SET msg "hello"
RPUSH numbers 128 256 512

那么RDB的持久化就是保存msg和numbers的键值对,而AOF则是保存SET和RPUSH的命令。

AOF持久化的实现

AOF的持久化功能分为命令追加、文件写入、文件同步三个步骤。

命令追加

打开AOF持久化功能后,服务器在执行完一个写命令后,会以redis的协议格式将被执行的命令写到服务器状态的缓冲区,则redisServer结构的aof_buf字段。在大量写请求情况下,利用缓冲区缓存一部分命令,尔后再根据某种策略写入磁盘,减少IO。

1
2
3
struct redisServer {
sds aof_buf; /* AOF buffer, written before entering the event loop */
}

AOF文件的写入与同步

Redis的服务器进程中有一个事件循环,正如注释所说的,每次结束事件循环前都会调用flushAppendOnlyFile()函数,该函数则根据配置选项决定如何写入AOF文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void beforeSleep(struct aeEventLoop *eventLoop)  {
// ...
flushAppendOnlyFile(0);
// ...
}

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

void flushAppendOnlyFile(int force) {
//...
try_fsync:
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;

if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
latencyStartMonitor(latency); //监控
redis_fsync(server.aof_fd); /* 同步到磁盘 */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd); // 在额外的线程中开启一个任务去执行fsync()
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
  • force:如果持久化策略为everysec,就有一定的可能延迟flush,因为后台进程可能还在进行fsync(),而如果force设成1,则无论什么情况都会进行写入。

另外由于在Linux中用户调用write函数时,操作系统会先将写入数据保存在一个内存缓冲区中,redis支持服务器配置appendfsync选项来定义上面的函数行为:

1
2
3
4
5
/* Append only defines */
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC //默认
  • AOF_FSYNC_ALWAYS:将aof_buf缓冲区的所有内容写入并同步到AOF文件;
  • AOF_FSYNC_EVERYSEC:将aof_buf缓冲区的所有内容写入AOF文件,如果上次同步AOF文件的时间距离现在超过1秒,则再次进行同步;
  • AOF_FSYNC_NO:写入文件但不同步;

AOF文件的载入与数据还原

由于AOF文件包含了重建数据库的所有写命令,因此只需要重读执行一遍,就可以恢复服务器状态了。其实现函数为loadAppendOnlyFile()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int loadAppendOnlyFile(char *filename) {
struct client *fakeClient; // 创建一个伪客户端
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */

if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}

/* 特殊处理aof文件长度为0的情况 */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
server.aof_fsync_offset = server.aof_current_size;
fclose(fp);
return C_ERR;
}

/* 参数关系aof,避免有新纪录写入同一个文件 */
server.aof_state = AOF_OFF;

fakeClient = createAOFClient();
startLoadingFile(fp, filename); // 做全局状态的标记,表示正在加载文件

/* 如果有RDB前缀,则需要加载RDB文件 */
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
// ...
}

/* 读入AOF文件,一个一个命令执行. */
while(1) {
// ... 读取cmd

if (cmd == server.multiCommand) valid_before_multi = valid_up_to;

/* 在fake客户端上下文里执行命令 */
fakeClient->cmd = cmd;
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
queueMultiCommand(fakeClient);
} else {
cmd->proc(fakeClient);
}

/* 该客户端不作回应 */
serverAssert(fakeClient->bufpos == 0 &&
listLength(fakeClient->reply) == 0);

/* 客户端不受blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);

// ...
}

// ....
}
  1. 由代码可见,首先是创建一个不带网络连接,不做回应不受blocked的客户端,因为执行命令只能在客户端上下文执行;
  2. 从AOF文件中分析并读出写命令;
  3. 用伪客户端执行该命令,知道所有命令处理完毕;

AOF重写

为了解决AOF文件体积膨胀的问题,Redis提供了AOF文件重写的功能,即Redis服务器会创建一个新的AOF文件来替代现有的AOF文件,并去除任何浪费空间的冗余命令。

AOF文件重写的实现

事实上,AOF文件重写并不会对老的AOF文件进行任何读取、分析或者写入操作,而是通过直接读取当前数据库的状态实现的。aof的重写是通过函数rewriteAppendOnlyFileRio实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
size_t processed = 0;
int j;

// 遍历数据库
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; // 写入select命令
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);

/* 写入select命令,指定数据库号码 */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

/* I遍历数据库中的每个key value */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;

keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);

expiretime = getExpire(db,&key);

/* 根据key的类型进行重写*/
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
/* 如果key带有过期时间,需要保存过期时间 */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* 从父进程中读取diff的内容 */
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
}
return C_OK;

werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}

另外,以写入集合键为例,可以看到为了避免在执行命令时导致客户端输入缓冲区溢出,重写快速链表、哈希表、集合和有序集合这种带有多个元素的key时,会先检查key包含的元素数量。如果超过了AOF_REWRITE_ITEMS_PER_CMD,则会使用多条命令进行重写。默认是64。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define AOF_REWRITE_ITEMS_PER_CMD 64

int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
//........
while (quicklistNext(li,&entry)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items; // 判断key元素是否超过AOF_REWRITE_ITEMS_PER_CMD
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}

// 写入value,省略

if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; // 如果超过了则使用多条RPUSH命令重写
items--;
}
quicklistReleaseIterator(li);
} else {
// ...
}
return 1;
}

AOF后台重写

为了避免函数会阻塞服务器处理客户端的请求,Redis将AOF重写放到子进程中执行,同时为了避免在子进程执行AOF重写期间,由于服务器进程在处理新的请求,从而使得现有数据库状态发生改变,Redis设置了一个AOF重写缓冲区,在服务器创建完子进程后开始使用,当Redis执行完一个写命令之后,会同时将写命令发送到AOF缓冲区和AOF重写缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
void bgrewriteaofCommand(client *c) {
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess()) {
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReplyError(c,"Can't execute an AOF background rewriting. "
"Please check the server logs for more information.");
}
}

首先判断是否已经存在相关bgrewrite子进程,倘若有会在这些命令完成后执行。否则会fork出子进程。在子进程完成aof重写后,会发一个信号给父进程,父进程会调用backgroundRewriteDoneHandler()将aof重写缓冲区中的所有内容写入到新的aof文件中,然后进行原子性地覆盖旧的aof文件。重写缓冲区的内容是通过aofRewriteBufferWrite写入到新的aof文件中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;

listRewind(server.aof_rewrite_buf_blocks,&li);
// 逐个地将aof_rewrite_buf_blocks缓冲区中的内容重写到aof文件
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;

if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}