1、申 请 I D :logbird
2、个人邮箱:logbird@126.com
3、原创技术文章:Redis 2.8.9源码 - RDB 模块 - save 和 load处理流程 |
本人博客:http://my.oschina.net/fuckphp/blog 本文代码可以在 src/rdb.h 和 src/rdb.c 两个文件中找到,设计其余文件的会注明。 redis 中 rdb作为Redis实现持久化的一部分,主要用于将数据库中的内存数据,通过对不同类型的处理后直接导出在硬盘中,并且在需要的时候来恢复数据,Redis主从同步 主全量推送 到从的时候主要通过 发送rdb 文件来实现。 redis在进行rdbSave时候的执行如图: rdbSave:(如下代码定义在 src/rdb.c 中): Redis中实现rdb操作的主函数,所有的流程逻辑都在此处
[size=1em][backcolor=rgb(108, 226, 108) !important][color=white !important][size=1em]?
[size=1em]1
[size=1em]2
[size=1em]3
[size=1em]4
[size=1em]5
[size=1em]6
[size=1em]7
[size=1em]8
[size=1em]9
[size=1em]10
[size=1em]11
[size=1em]12
[size=1em]13
[size=1em]14
[size=1em]15
[size=1em]16
[size=1em]17
[size=1em]18
[size=1em]19
[size=1em]20
[size=1em]21
[size=1em]22
[size=1em]23
[size=1em]24
[size=1em]25
[size=1em]26
[size=1em]27
[size=1em]28
[size=1em]29
[size=1em]30
[size=1em]31
[size=1em]32
[size=1em]33
[size=1em]34
[size=1em]35
[size=1em]36
[size=1em]37
[size=1em]38
[size=1em]39
[size=1em]40
[size=1em]41
[size=1em]42
[size=1em]43
[size=1em]44
[size=1em]45
[size=1em]46
[size=1em]47
[size=1em]48
[size=1em]49
[size=1em]50
[size=1em]51
[size=1em]52
[size=1em]53
[size=1em]54
[size=1em]55
[size=1em]56
[size=1em]57
[size=1em]58
[size=1em]59
[size=1em]60
[size=1em]61
[size=1em]62
[size=1em]63
[size=1em]64
[size=1em]65
[size=1em]66
[size=1em]67
[size=1em]68
[size=1em]69
[size=1em]70
[size=1em]71
[size=1em]72
[size=1em]73
[size=1em]74
[size=1em]75
[size=1em]76
[size=1em]77
[size=1em]78
[size=1em]79
[size=1em]80
[size=1em]81
[size=1em]82
[size=1em]83
[size=1em]84
[size=1em]85
[size=1em]86
[size=1em]87
[size=1em]88
[size=1em]89
[size=1em]90
[size=1em]91
[size=1em]92
[size=1em]93
[size=1em]94
[size=1em]95
[size=1em]96
[size=1em]97
[size=1em]98
[size=1em]99
[size=1em]100
| [size=1em][size=1em]int rdbSave(char *filename) {
[size=1em] //字典迭代器
[size=1em] dictIterator *di = NULL;
[size=1em] dictEntry *de;
[size=1em] char tmpfile[256];
[size=1em] char magic[10];
[size=1em] int j;
[size=1em] long long now = mstime();
[size=1em] FILE *fp;
[size=1em] //创建rio对象
[size=1em] rio rdb;
[size=1em] uint64_t cksum;
[size=1em] //临时文件
[size=1em] snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
[size=1em] //创建一个临时文件
[size=1em] fp = fopen(tmpfile,"w");
[size=1em] if (!fp) {
[size=1em] redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
[size=1em] strerror(errno));
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] //初始化 file rio
[size=1em] rioInitWithFile(&rdb,fp);
[size=1em] //设置校验和计算函数
[size=1em] if (server.rdb_checksum)
[size=1em] rdb.update_cksum = rioGenericUpdateChecksum;
[size=1em] //RDB版本
[size=1em] snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
[size=1em] //临时文件中写入rdb版本号
[size=1em] if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
[size=1em] //遍历每一个数据库
[size=1em] for (j = 0; j < server.dbnum; j++) {
[size=1em] //设置 当前遍历的数据库
[size=1em] redisDb *db = server.db+j;
[size=1em] //取到key space对应的字典
[size=1em] dict *d = db->dict;
[size=1em] if (dictSize(d) == 0) continue;
[size=1em] //创建一个安全迭代
[size=1em] di = dictGetSafeIterator(d);
[size=1em] //安全迭代失败返回错误
[size=1em] if (!di) {
[size=1em] fclose(fp);
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] //写入当前执行rdb的数据库号
[size=1em] if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
[size=1em] //通过位运算压缩写入长度 详细见后文
[size=1em] if (rdbSaveLen(&rdb,j) == -1) goto werr;
[size=1em] //进行安全迭代
[size=1em] while((de = dictNext(di)) != NULL) {
[size=1em] //获取当前遍历的key val
[size=1em] sds keystr = dictGetKey(de);
[size=1em] robj key, *o = dictGetVal(de);
[size=1em] long long expire;
[size=1em] //初始化一个string类型的Redis Object
[size=1em] initStaticStringObject(key,keystr);
[size=1em] //获取key的过期时间
[size=1em] expire = getExpire(db,&key);
[size=1em] //存储 key value
[size=1em] if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
[size=1em] }
[size=1em] //释放安全迭代器
[size=1em] dictReleaseIterator(di);
[size=1em] }
[size=1em] di = NULL; ;
[size=1em] /* EOF opcode */
[size=1em] //标示rdb结束
[size=1em] if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
[size=1em] //更新校验和,并写入校验和
[size=1em] cksum = rdb.cksum;
[size=1em] //翻转校验和
[size=1em] memrev64ifbe(&cksum);
[size=1em] rioWrite(&rdb,&cksum,8);
[size=1em] //将剩余没有 flush的数据 flush到硬盘上
[size=1em] if (fflush(fp) == EOF) goto werr;
[size=1em] if (fsync(fileno(fp)) == -1) goto werr;
[size=1em] if (fclose(fp) == EOF) goto werr; //使用临时文件替换rdb文件
[size=1em] if (rename(tmpfile,filename) == -1) {
[size=1em] redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
[size=1em] unlink(tmpfile);
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] redisLog(REDIS_NOTICE,"DB saved on disk");
[size=1em] server.dirty = 0;
[size=1em] //更新最后修改时间
[size=1em] server.lastsave = time(NULL);
[size=1em] //更新状态
[size=1em] server.lastbgsave_status = REDIS_OK;
[size=1em] return REDIS_OK;
[size=1em]werr:
[size=1em] fclose(fp);
[size=1em] unlink(tmpfile);
[size=1em] redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
[size=1em] if (di) dictReleaseIterator(di);
[size=1em] return REDIS_ERR;
[size=1em]}
|
rdbSaveBackground: redis 执行rdbSave操作的时候,会阻塞主进程,这时候将无法继续响应客户端请求,对于一个数据库,这样是不可行的,所以Redis 会使用子进程来执行rdbSave操作 [size=1em][backcolor=rgb(108, 226, 108) !important][color=white !important][size=1em]?
[size=1em]1
[size=1em]2
[size=1em]3
[size=1em]4
[size=1em]5
[size=1em]6
[size=1em]7
[size=1em]8
[size=1em]9
[size=1em]10
[size=1em]11
[size=1em]12
[size=1em]13
[size=1em]14
[size=1em]15
[size=1em]16
[size=1em]17
[size=1em]18
[size=1em]19
[size=1em]20
[size=1em]21
[size=1em]22
[size=1em]23
[size=1em]24
[size=1em]25
[size=1em]26
[size=1em]27
[size=1em]28
[size=1em]29
[size=1em]30
[size=1em]31
[size=1em]32
[size=1em]33
[size=1em]34
[size=1em]35
[size=1em]36
[size=1em]37
[size=1em]38
[size=1em]39
[size=1em]40
[size=1em]41
[size=1em]42
[size=1em]43
[size=1em]44
| [size=1em][size=1em]int rdbSaveBackground(char *filename) {
[size=1em] pid_t childpid;
[size=1em] long long start;
[size=1em] if (server.rdb_child_pid != -1) return REDIS_ERR;
[size=1em] server.dirty_before_bgsave = server.dirty;
[size=1em] server.lastbgsave_try = time(NULL);
[size=1em] start = ustime();
[size=1em] //创建子进程,并复制父进程的进程空间
[size=1em] if ((childpid = fork()) == 0) {
[size=1em] int retval;
[size=1em] //子进程关闭继承的socket
[size=1em] closeListeningSockets(0);
[size=1em] //设置当前状态
[size=1em] redisSetProcTitle("redis-rdb-bgsave");
[size=1em] //执行 rdbSave 操作
[size=1em] retval = rdbSave(filename);
[size=1em] if (retval == REDIS_OK) {
[size=1em] size_t private_dirty = zmalloc_get_private_dirty();
[size=1em] if (private_dirty) {
[size=1em] redisLog(REDIS_NOTICE,
[size=1em] "RDB: %zu MB of memory used by copy-on-write",
[size=1em] private_dirty/(1024*1024));
[size=1em] }
[size=1em] }
[size=1em] exitFromChild((retval == REDIS_OK) ? 0 : 1);
[size=1em] } else {
[size=1em] /* Parent */
[size=1em] server.stat_fork_time = ustime()-start;
[size=1em] if (childpid == -1) {
[size=1em] server.lastbgsave_status = REDIS_ERR;
[size=1em] redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
[size=1em] strerror(errno));
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
[size=1em] server.rdb_save_time_start = time(NULL);
[size=1em] server.rdb_child_pid = childpid;
[size=1em] updateDictResizePolicy();
[size=1em] return REDIS_OK;
[size=1em] }
[size=1em] return REDIS_OK; /* unreached */
[size=1em]}
|
另外一个比较重要的函数: [size=1em][backcolor=rgb(108, 226, 108) !important][color=white !important][size=1em]?
[size=1em]1
[size=1em]2
[size=1em]3
[size=1em]4
[size=1em]5
[size=1em]6
[size=1em]7
[size=1em]8
[size=1em]9
[size=1em]10
[size=1em]11
[size=1em]12
[size=1em]13
[size=1em]14
[size=1em]15
[size=1em]16
[size=1em]17
[size=1em]18
[size=1em]19
| [size=1em][size=1em]int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
[size=1em] long long expiretime, long long now)
[size=1em]{
[size=1em] /* Save the expire time */
[size=1em] if (expiretime != -1) {
[size=1em] /* If this key is already expired skip it */
[size=1em] if (expiretime < now) return 0;
[size=1em] //如果存在有效期 并且未过期 写入类型 和 过期时间
[size=1em] if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
[size=1em] if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
[size=1em] }
[size=1em] //写入 type key 和 value
[size=1em] if (rdbSaveObjectType(rdb,val) == -1) return -1;
[size=1em] //保存key信息
[size=1em] if (rdbSaveStringObject(rdb,key) == -1) return -1;
[size=1em] //保存value对象
[size=1em] if (rdbSaveObject(rdb,val) == -1) return -1;
[size=1em] return 1;
[size=1em]}
|
redis在进行rdbLoad时候的执行如图:
rdbLoad :
[size=1em][backcolor=rgb(108, 226, 108) !important][color=white !important][size=1em]?
[size=1em]1
[size=1em]2
[size=1em]3
[size=1em]4
[size=1em]5
[size=1em]6
[size=1em]7
[size=1em]8
[size=1em]9
[size=1em]10
[size=1em]11
[size=1em]12
[size=1em]13
[size=1em]14
[size=1em]15
[size=1em]16
[size=1em]17
[size=1em]18
[size=1em]19
[size=1em]20
[size=1em]21
[size=1em]22
[size=1em]23
[size=1em]24
[size=1em]25
[size=1em]26
[size=1em]27
[size=1em]28
[size=1em]29
[size=1em]30
[size=1em]31
[size=1em]32
[size=1em]33
[size=1em]34
[size=1em]35
[size=1em]36
[size=1em]37
[size=1em]38
[size=1em]39
[size=1em]40
[size=1em]41
[size=1em]42
[size=1em]43
[size=1em]44
[size=1em]45
[size=1em]46
[size=1em]47
[size=1em]48
[size=1em]49
[size=1em]50
[size=1em]51
[size=1em]52
[size=1em]53
[size=1em]54
[size=1em]55
[size=1em]56
[size=1em]57
[size=1em]58
[size=1em]59
[size=1em]60
[size=1em]61
[size=1em]62
[size=1em]63
[size=1em]64
[size=1em]65
[size=1em]66
[size=1em]67
[size=1em]68
[size=1em]69
[size=1em]70
[size=1em]71
[size=1em]72
[size=1em]73
[size=1em]74
[size=1em]75
[size=1em]76
[size=1em]77
[size=1em]78
[size=1em]79
[size=1em]80
[size=1em]81
[size=1em]82
[size=1em]83
[size=1em]84
[size=1em]85
[size=1em]86
[size=1em]87
[size=1em]88
[size=1em]89
[size=1em]90
[size=1em]91
[size=1em]92
[size=1em]93
[size=1em]94
[size=1em]95
[size=1em]96
[size=1em]97
[size=1em]98
[size=1em]99
[size=1em]100
[size=1em]101
[size=1em]102
[size=1em]103
[size=1em]104
[size=1em]105
[size=1em]106
| [size=1em][size=1em]int rdbLoad(char *filename) {
[size=1em] uint32_t dbid;
[size=1em] int type, rdbver;
[size=1em] redisDb *db = server.db+0;
[size=1em] char buf[1024];
[size=1em] long long expiretime, now = mstime();
[size=1em] FILE *fp;
[size=1em] rio rdb;
[size=1em] //打开rdb文件
[size=1em] if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
[size=1em] rioInitWithFile(&rdb,fp);
[size=1em] //设置校验和函数
[size=1em] rdb.update_cksum = rdbLoadProgressCallback;
[size=1em] //这只每次处理的最大块
[size=1em] rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
[size=1em] //读取redis版本
[size=1em] if (rioRead(&rdb,buf,9) == 0) goto eoferr;
[size=1em] buf[9] = '\0';
[size=1em] //验证是否REDIS开头
[size=1em] if (memcmp(buf,"REDIS",5) != 0) {
[size=1em] fclose(fp);
[size=1em] redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
[size=1em] errno = EINVAL;
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] //验证版本 必须大于1 并且 比当前版本低
[size=1em] rdbver = atoi(buf+5);
[size=1em] if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
[size=1em] fclose(fp);
[size=1em] redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
[size=1em] errno = EINVAL;
[size=1em] return REDIS_ERR;
[size=1em] }
[size=1em] //标记开始进行加载
[size=1em] startLoading(fp);
[size=1em] while(1) {
[size=1em] robj *key, *val;
[size=1em] expiretime = -1;
[size=1em] //读取当前行的类型
[size=1em] if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
[size=1em] //读取秒级别的过期时间
[size=1em] if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
[size=1em] if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
[size=1em] //继续读取redis key的类型
[size=1em] if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
[size=1em] expiretime *= 1000;
[size=1em] } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
[size=1em] //读取毫秒级别的过期时间
[size=1em] if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
[size=1em] //继续读取redis key的类型
[size=1em] if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
[size=1em] }
[size=1em] //如果结束则退出
[size=1em] if (type == REDIS_RDB_OPCODE_EOF)
[size=1em] break;
[size=1em] //选择切换db
[size=1em] if (type == REDIS_RDB_OPCODE_SELECTDB) {
[size=1em] if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
[size=1em] goto eoferr;
[size=1em] if (dbid >= (unsigned)server.dbnum) {
[size=1em] redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
[size=1em] exit(1);
[size=1em] }
[size=1em] db = server.db+dbid;
[size=1em] continue;
[size=1em] }
[size=1em] //加载key
[size=1em] if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
[size=1em] //根据不同的类型的value加载value值
[size=1em] if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
[size=1em] //如果为redis 主 则忽略过期key,并将key val的引用计数减少
[size=1em] if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
[size=1em] decrRefCount(key);
[size=1em] decrRefCount(val);
[size=1em] continue;
[size=1em] }
[size=1em] //将key val加到指定的db中
[size=1em] dbAdd(db,key,val);
[size=1em] //如果存在过期时间 则设置过期
[size=1em] if (expiretime != -1) setExpire(db,key,expiretime);
[size=1em] //加入完成减少key的引用计数
[size=1em] decrRefCount(key);
[size=1em] }
[size=1em] //验证redis 版本,验证校验和
[size=1em] if (rdbver >= 5 && server.rdb_checksum) {
[size=1em] uint64_t cksum, expected = rdb.cksum;
[size=1em] if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
[size=1em] memrev64ifbe(&cksum);
[size=1em] if (cksum == 0) {
[size=1em] redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
[size=1em] } else if (cksum != expected) {
[size=1em] redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
[size=1em] exit(1);
[size=1em] }
[size=1em] }
[size=1em] fclose(fp);
[size=1em] //标志结束
[size=1em] stopLoading();
[size=1em] return REDIS_OK;
[size=1em]eoferr: /* unexpected end of file is handled here with a fatal exit */
[size=1em] redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
[size=1em] exit(1);
[size=1em] return REDIS_ERR; /* Just to avoid warning */
[size=1em]}
|
Redis会对不同类型的 redis object 进行不同的处理,将会在下一篇文章介绍每种类型的压缩以及处理方式。
Redis2.8.9源码 src/rdb.h src/rdb.c 流程图工具
|