前些天一台redis机器硬盘写满了,主要是由于程序bug导致备份量激增,而恰好监控程序的通知机制也罢工了,于是第一次体验到了redis的罢工(只读不写)。 现在我们来看下在磁盘写满后redis的处理机制: save流程:servercron-rdbsavebackground-rdbsave save后
前些天一台redis机器硬盘写满了,主要是由于程序bug导致备份量激增,而恰好监控程序的通知机制也罢工了,于是第一次体验到了redis的罢工(只读不写)。
现在我们来看下在磁盘写满后redis的处理机制:
save流程:servercron->rdbsavebackground->rdbsave
save后流程:servercron->backgroundsavedonehandler
上述流程产生的结果就是server.lastbgsave_status = redis_err,
受其影响,processcommand和luaredisgenericcommand中判断如果是写操作,则直接返回redis_ok,而没有实际写入
1.rdbsave所有的写出错都会返回redis_err
int rdbsave(char *filename) { dictiterator *di = null; dictentry *de; char tmpfile[256]; char magic[10]; int j; long long now = mstime(); file *fp; rio rdb; uint64_t cksum; snprintf(tmpfile,256,temp-%d.rdb, (int) getpid()); fp = fopen(tmpfile,w); if (!fp) { redislog(redis_warning, failed opening .rdb for saving: %s, strerror(errno)); return redis_err; } rioinitwithfile(&rdb,fp); if (server.rdb_checksum) rdb.update_cksum = riogenericupdatechecksum; snprintf(magic,sizeof(magic),redis%04d,redis_rdb_version); if (rdbwriteraw(&rdb,magic,9) == -1) goto werr; for (j = 0; j dict; if (dictsize(d) == 0) continue; di = dictgetsafeiterator(d); if (!di) { fclose(fp); return redis_err; } /* write the select db opcode */ if (rdbsavetype(&rdb,redis_rdb_opcode_selectdb) == -1) goto werr; if (rdbsavelen(&rdb,j) == -1) goto werr; /* iterate this db writing every entry */ while((de = dictnext(di)) != null) { sds keystr = dictgetkey(de); robj key, *o = dictgetval(de); long long expire; initstaticstringobject(key,keystr); expire = getexpire(db,&key); if (rdbsavekeyvaluepair(&rdb,&key,o,expire,now) == -1) goto werr; } dictreleaseiterator(di); } di = null; /* so that we don't release it again on error. */ /* eof opcode */ if (rdbsavetype(&rdb,redis_rdb_opcode_eof) == -1) goto werr; /* crc64 checksum. it will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ cksum = rdb.cksum; memrev64ifbe(&cksum); if (riowrite(&rdb,&cksum,8) == 0) goto werr; /* make sure data will not remain on the os's output buffers */ if (fflush(fp) == eof) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == eof) goto werr; /* use rename to make sure the db file is changed atomically only * if the generate db file is ok. */ if (rename(tmpfile,filename) == -1) { redislog(redis_warning,error moving temp db file on the final destination: %s, strerror(errno)); unlink(tmpfile); return redis_err; } redislog(redis_notice,db saved on disk); server.dirty = 0; server.lastsave = time(null); server.lastbgsave_status = redis_ok; return redis_ok;werr: fclose(fp); unlink(tmpfile); redislog(redis_warning,write error saving db on disk: %s, strerror(errno)); if (di) dictreleaseiterator(di); return redis_err;}
2.rdbsavebackground中,如果子进程调用rdbsave返回redis_err,那么子进程exit(1)
int rdbsavebackground(char *filename) { pid_t childpid; long long start; if (server.rdb_child_pid != -1) return redis_err; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(null); start = ustime(); if ((childpid = fork()) == 0) { int retval; /* child */ closelisteningsockets(0); redissetproctitle(redis-rdb-bgsave); retval = rdbsave(filename); if (retval == redis_ok) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redislog(redis_notice, rdb: %zu mb of memory used by copy-on-write, private_dirty/(1024*1024)); } } exitfromchild((retval == redis_ok) ? 0 : 1); //进程退出时返回0/1 } else { /* parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { server.lastbgsave_status = redis_err; redislog(redis_warning,can't save in background: fork: %s, strerror(errno)); return redis_err; } redislog(redis_notice,background saving started by pid %d,childpid); server.rdb_save_time_start = time(null); server.rdb_child_pid = childpid; updatedictresizepolicy(); return redis_ok; } return redis_ok; /* unreached */}
3.bgsave完成后,servercron中得到bgsave子进程的返回码进行后续处理 /* check if a background saving or aof rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,wnohang,null)) != 0) { int exitcode = wexitstatus(statloc); int bysignal = 0; if (wifsignaled(statloc)) bysignal = wtermsig(statloc); if (pid == server.rdb_child_pid) { backgroundsavedonehandler(exitcode,bysignal); //根据bgsave子进程的exitcode以及是否由信号结束的标签进行后续处理 } else if (pid == server.aof_child_pid) { backgroundrewritedonehandler(exitcode,bysignal); } else { redislog(redis_warning, warning, detected child with unmatched pid: %ld, (long)pid); } updatedictresizepolicy(); } }
4.如果子进程非信号结束,并且exitcode非0,那么设置bgsave状态为redis_errvoid backgroundsavedonehandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redislog(redis_notice, background saving terminated with success); server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(null); server.lastbgsave_status = redis_ok; } else if (!bysignal && exitcode != 0) { redislog(redis_warning, background saving error); server.lastbgsave_status = redis_err; //状态转换 } else { mstime_t latency; redislog(redis_warning, background saving terminated by signal %d, bysignal); latencystartmonitor(latency); rdbremovetempfile(server.rdb_child_pid); latencyendmonitor(latency); latencyaddsampleifneeded(rdb-unlink-temp-file,latency); /* sigusr1 is whitelisted, so we have a way to kill a child without * tirggering an error conditon. */ if (bysignal != sigusr1) server.lastbgsave_status = redis_err; } server.rdb_child_pid = -1; server.rdb_save_time_last = time(null)-server.rdb_save_time_start; server.rdb_save_time_start = -1; /* possibly there are slaves waiting for a bgsave in order to be served * (the first stage of sync is a bulk transfer of dump.rdb) */ updateslaveswaitingbgsave((!bysignal && exitcode == 0) ? redis_ok : redis_err);}
5.processcommand中判定cmd是写操作的话,直接返回redis_ok
/* don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == redis_err) || server.aof_last_write_status == redis_err) && server.masterhost == null && (c->cmd->flags & redis_cmd_write || c->cmd->proc == pingcommand)) { flagtransaction(c); if (server.aof_last_write_status == redis_ok) addreply(c, shared.bgsaveerr); else addreplysds(c, sdscatprintf(sdsempty(), -misconf errors writing to the aof file: %s\r\n, strerror(server.aof_last_write_errno))); return redis_ok; }
6.luaredisgenericcommand中判定cmd是写操作的话,屏蔽 /* write commands are forbidden against read-only slaves, or if a * command marked as non-deterministic was already called in the context * of this script. */ if (cmd->flags & redis_cmd_write) { if (server.lua_random_dirty) { luapusherror(lua, write commands not allowed after non deterministic commands); goto cleanup; } else if (server.masterhost && server.repl_slave_ro && !server.loading && !(server.lua_caller->flags & redis_master)) { luapusherror(lua, shared.roslaveerr->ptr); goto cleanup; } else if (server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == redis_err) { luapusherror(lua, shared.bgsaveerr->ptr); goto cleanup; } }cleanup: /* clean up. command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ for (j = 0; j argc; j++) { robj *o = c->argv[j]; /* try to cache the object in the cached_objects array. * the object must be small, sds-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j refcount == 1 && o->encoding == redis_encoding_raw && sdslen(o->ptr) ptr))-(sizeof(struct sdshdr))); if (cached_objects[j]) decrrefcount(cached_objects[j]); cached_objects[j] = o; cached_objects_len[j] = sh->free + sh->len; } else { decrrefcount(o); } } if (c->argv != argv) { zfree(c->argv); argv = null; } if (raise_error) { /* if we are here we should have an error in the stack, in the * form of a table with an err field. extract the string to * return the plain error. */ lua_pushstring(lua,err); lua_gettable(lua,-2); return lua_error(lua); } return 1;