vm是redis2.0新增的一个功能。在没有vm之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入vm功能来试图解决这个问题。简言之,vm使得redis会把很少
vm是redis2.0新增的一个功能。在没有vm之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入vm功能来试图解决这个问题。简言之,vm使得redis会把很少访问的value保存到磁盘中。但同时,所有value的key都放在内存中,这是为了让被换出的value的查找在启用vm前后性能差不多。
vm在redis中算是redis中最复杂的模块之一,我们分三节来介绍。这一节介绍redis的主要数据结构,下一节介绍非阻塞方式,最后一节介绍多线程方式。
我们先来看看redis中的通用对象结构redisobject :
// vm启用时, 对象所处位置#define redis_vm_memory 0 /* the object is on memory */#define redis_vm_swapped 1 /* the object is on disk */#define redis_vm_swapping 2 /* redis is swapping this object on disk */#define redis_vm_loading 3 /* redis is loading this object from disk *//* the vm object structure */struct redisobjectvm { off_t page; /* the page at witch the object is stored on disk */ off_t usedpages; /* number of pages used on disk */ time_t atime; /* last access time */} vm;/* the actual redis object */// 通用类型// 对于key,需额外标志保存value的位置、类型等typedef struct redisobject { void *ptr; unsigned char type; unsigned char encoding; unsigned char storage; /* if this object is a key, where is the value? * redis_vm_memory, redis_vm_swapped, ... */ unsigned char vtype; /* if this object is a key, and value is swapped out, * this is the type of the swapped out object. */ int refcount; /* vm fields, this are only allocated if vm is active, otherwise the * object allocation function will just allocate * sizeof(redisobjct) minus sizeof(redisobjectvm), so using * redis without vm active will not have any overhead. */ struct redisobjectvm vm;} robj;
robj 中的type保存了对象的类型,如string、list、set等。storage保存了该key对象对应的value所处的位置:内存、磁盘、正在被换出到磁盘,正在加载。vtype表示该key对象所对应的value的类型。page和usedpages保存了该key对象所对应的 value,atime是value的最后一次访问时间。因此,当robj所表示的key对象的storage类型为redis_vm_swapped 时,就表示该key的value已不在内存中,需从vm中page的位置加载该value,vaue的类型为vtype,大小为usedpages。
创建对象的时候,根据是否启用vm机制,来分配合适大小的robj对象大小。
static robj *createobject(int type, void *ptr) { --- else { if (server.vm_enabled) { pthread_mutex_unlock(&server.obj_freelist_mutex); o = zmalloc(sizeof(*o)); } else { o = zmalloc(sizeof(*o)-sizeof(struct redisobjectvm)); } } --- if (server.vm_enabled) { /* note that this code may run in the context of an i/o thread * and accessing to server.unixtime in theory is an error * (no locks). but in practice this is safe, and even if we read * garbage redis will not fail, as it's just a statistical info */ o->vm.atime = server.unixtime; o->storage = redis_vm_memory; } return o;}
vm的所有相关结构保存在redisserver 的如下几个字段中。
/* global server state structure */struct redisserver { --- /* virtual memory state */ file *vm_fp; int vm_fd; off_t vm_next_page; /* next probably empty page */ off_t vm_near_pages; /* number of pages allocated sequentially */ unsigned char *vm_bitmap; /* bitmap of free/used pages */ time_t unixtime; /* unix time sampled every second. */ /* virtual memory i/o threads stuff */ /* an i/o thread process an element taken from the io_jobs queue and * put the result of the operation in the io_done list. while the * job is being processed, it's put on io_processing queue. */ list *io_newjobs; /* list of vm i/o jobs yet to be processed */ list *io_processing; /* list of vm i/o jobs being processed */ list *io_processed; /* list of vm i/o jobs already processed */ list *io_ready_clients; /* clients ready to be unblocked. all keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */ pthread_mutex_t io_swapfile_mutex; /* so we can lseek + write */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* number of running i/o threads */ int vm_max_threads; /* max number of i/o threads running at the same time */ /* our main thread is blocked on the event loop, locking for sockets ready * to be read or written, so when a threaded i/o operation is ready to be * processed by the main thread, the i/o thread will use a unix pipe to * awake the main thread. the followings are the two pipe fds. */ int io_ready_pipe_read; int io_ready_pipe_write; /* virtual memory stats */ unsigned long long vm_stats_used_pages; unsigned long long vm_stats_swapped_objects; unsigned long long vm_stats_swapouts; unsigned long long vm_stats_swapins; ---};
vm_fp 和vm_fd指向磁盘上的vm文件,通过这两个指针来读写vm文件。vm_bitmap管理着vm文件中每一页的分配与释放情况(每一项为0表示该页空闲,为1表示已使用)。每一页的大小通过vm-page-size来配置,页数通过vm-pages来配置。值得一提的是,redis中的每一页最多只能放置一个对象,一个对象可以放在连续的多个页上。unixtime只是缓存时间值,这在计算value的最近使用频率时会用到。接下来的结构跟多线程方式换出/换进vlue有关。使用多线程方式时,换进/换出value被看成一个个的job,job的类型有如下几种:
/* vm threaded i/o request message */#define redis_iojob_load 0 /* load from disk to memory */#define redis_iojob_prepare_swap 1 /* compute needed pages */#define redis_iojob_do_swap 2 /* swap from memory to disk */typedef struct iojob { int type; /* request type, redis_iojob_* */ redisdb *db;/* redis database */ robj *key; /* this i/o request is about swapping this key */ robj *val; /* the value to swap for redis_ioreq_*_swap, otherwise this * field is populated by the i/o thread for redis_ioreq_load. */ off_t page; /* swap page where to read/write the object */ off_t pages; /* swap pages needed to save object. prepare_swap return val */ int canceled; /* true if this command was canceled by blocking side of vm */ pthread_t thread; /* id of the thread processing this entry */} iojob;
类型为redis_iojob_load的job用来加载某个value,类型为redis_iojob_do_swap的job就用来换出某个 value,在换出value之前,需要创建类型为redis_iojob_prepare_swap的job来计算所需的交换页数。
无论是上述3种中的哪一种,新建的job都会使用queueiojob放在io_newjobs队列中,而线程入口函数iothreadentrypoint 会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmthreadediocompletedjob继续运行,该函数会做些后续工作。
io_ready_clients保存了可以继续运行的client链表(之前因为等待value已阻塞),后面几个结构跟多线程的保护和全局的vm统计有关。
vm的初始化在vminit中,主要做的工作就是上面介绍的几个结构的初始化。除此之外,最重要的工作就是设置管道的read事件的处理函数vmthreadediocompletedjob,该函数会在管道可读时运行,跟多线程的运行密切相关。
static void vminit(void) { off_t totsize; int pipefds[2]; size_t stacksize; struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ redislog(redis_notice,using '%s' as swap file,server.vm_swap_file); /* try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,r+b)) == null) { server.vm_fp = fopen(server.vm_swap_file,w+b); } if (server.vm_fp == null) { redislog(redis_warning, can't open the swap file: %s. exiting., strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); /* lock the swap file for writing, this is useful in order to avoid * another instance to use the same swap file for a config error. */ fl.l_type = f_wrlck; fl.l_whence = seek_set; fl.l_start = fl.l_len = 0; if (fcntl(server.vm_fd,f_setlk,&fl) == -1) { redislog(redis_warning, can't lock the swap file at '%s': %s. make sure it is not used by another redis instance., server.vm_swap_file, strerror(errno)); exit(1); } /* initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; server.vm_stats_swapped_objects = 0; server.vm_stats_swapouts = 0; server.vm_stats_swapins = 0; totsize = server.vm_pages*server.vm_page_size; redislog(redis_notice,allocating %lld bytes of swap file,totsize); if (ftruncate(server.vm_fd,totsize) == -1) { redislog(redis_warning,can't ftruncate swap file: %s. exiting., strerror(errno)); exit(1); } else { redislog(redis_notice,swap file allocated with success); } server.vm_bitmap = zmalloc((server.vm_pages+7)/8); redislog(redis_verbose,allocated %lld bytes page table for %lld pages, (long long) (server.vm_pages+7)/8, server.vm_pages); memset(server.vm_bitmap,0,(server.vm_pages+7)/8); /* initialize threaded i/o (used by virtual memory) */ server.io_newjobs = listcreate(); server.io_processing = listcreate(); server.io_processed = listcreate(); server.io_ready_clients = listcreate(); pthread_mutex_init(&server.io_mutex,null); pthread_mutex_init(&server.obj_freelist_mutex,null); pthread_mutex_init(&server.io_swapfile_mutex,null); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redislog(redis_warning,unable to intialized vm: pipe(2): %s. exiting. ,strerror(errno)); exit(1); } server.io_ready_pipe_read = pipefds[0]; server.io_ready_pipe_write = pipefds[1]; redisassert(anetnonblock(null,server.io_ready_pipe_read) != anet_err); /* lzf requires a lot of stack */ pthread_attr_init(&server.io_threads_attr); pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); /* solaris may report a stacksize of 0, let's set it to 1 otherwise 115 * multiplying it by 2 in the while loop later will not really help */ if (!stacksize) stacksize = 1; while (stacksize 原文地址:redis源代码分析23–vm(上), 感谢原作者分享。