您好,欢迎访问一九零五行业门户网

redis源代码分析24–VM(中)

vm根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。 这一节主要介绍阻塞方式。 redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 v
vm根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。
这一节主要介绍阻塞方式。
redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 value(vmswaponeobjectblocking函数)。除此之外,redis只在servercron函数(该函数事件处理章节分析过)中换出value。我们来看看servercron中的处理代码,阻塞方式使用函数vmswaponeobjectblocking换出value,多线程方式使用函数vmswaponeobjectthreaded换出value。
static int servercron(struct aeeventloop *eventloop, long long id, void *clientdata) { --- /* swap a few keys on disk if we are over the memory limit and vm * is enbled. try to free objects from the free list first. */ if (vmcanswapout()) { while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) { --- if (tryfreeoneobjectfromfreelist() == redis_ok) continue; retval = (server.vm_max_threads == 0) ? vmswaponeobjectblocking() : vmswaponeobjectthreaded(); --- } } --- return 100;}
无论是阻塞方式vmswaponeobjectblocking换出value,还是多线程方式vmswaponeobjectthreaded换出value,最终都调用vmswaponeobject(调用参数不一样)来换出value。
vmswaponeobject会对每个db,随机选择5项,计算它的swappability,然后如果是多线程方式,则调用vmswapobjectthreaded来换出value,否则使用vmswapobjectblocking换出value。
static int vmswaponeobject(int usethreads) { int j, i; struct dictentry *best = null; double best_swappability = 0; redisdb *best_db = null; robj *key, *val; for (j = 0; j dict) == 0) continue; for (i = 0; i dict); key = dictgetentrykey(de); val = dictgetentryval(de); /* only swap objects that are currently in memory. * * also don't swap shared objects if threaded vm is on, as we * try to ensure that the main thread does not touch the * object while the i/o thread is using it, but we can't * control other keys without adding additional mutex. */ if (key->storage != redis_vm_memory || (server.vm_max_threads != 0 && val->refcount != 1)) { if (maxtries) i--; /* don't count this try */ continue; } val->vm.atime = key->vm.atime; /* atime is updated on key object */ swappability = computeobjectswappability(val); if (!best || swappability > best_swappability) { best = de; best_swappability = swappability; best_db = db; } } } if (best == null) return redis_err; key = dictgetentrykey(best); val = dictgetentryval(best); redislog(redis_debug,key with best swappability: %s, %f, key->ptr, best_swappability); /* unshare the key if needed */ if (key->refcount > 1) { robj *newkey = dupstringobject(key); decrrefcount(key); key = dictgetentrykey(best) = newkey; } /* swap it */ if (usethreads) { vmswapobjectthreaded(key,val,best_db); return redis_ok; } else { if (vmswapobjectblocking(key,val) == redis_ok) { dictgetentryval(best) = null; return redis_ok; } else { return redis_err; } }}
vmswapobjectblocking会在计算所需的交换页后,阻塞性的将value写到vm文件中(函数vmwriteobjectonswap),最后标记相应vm页为已使用。
static int vmswapobjectblocking(robj *key, robj *val) { off_t pages = rdbsavedobjectpages(val,null); off_t page; assert(key->storage == redis_vm_memory); assert(key->refcount == 1); if (vmfindcontiguouspages(&page,pages) == redis_err) return redis_err; if (vmwriteobjectonswap(val,page) == redis_err) return redis_err; key->vm.page = page; key->vm.usedpages = pages; key->storage = redis_vm_swapped; key->vtype = val->type; decrrefcount(val); /* deallocate the object from memory. */ vmmarkpagesused(page,pages); redislog(redis_debug,vm: object %s swapped out at %lld (%lld pages), (unsigned char*) key->ptr, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; return redis_ok;}
对于value的加载,如果是多线程方式,会使用blockclientonswappedkeys提前加载,但阻塞方式则只有到相应命令执行时才会加载。最终无论是阻塞方式还是多线程方式,都会调用lookupkey来查找key是否在内存中,若不在,则使用vmloadobject加载value,该函数是阻塞式的读入value。
static robj *lookupkey(redisdb *db, robj *key) { dictentry *de = dictfind(db->dict,key); if (de) { robj *key = dictgetentrykey(de); robj *val = dictgetentryval(de); if (server.vm_enabled) { if (key->storage == redis_vm_memory || key->storage == redis_vm_swapping) { /* if we were swapping the object out, stop it, this key * was requested. */ if (key->storage == redis_vm_swapping) vmcancelthreadediojob(key); /* update the access time of the key for the aging algorithm. */ key->vm.atime = server.unixtime; } else { int notify = (key->storage == redis_vm_loading); /* our value was swapped on disk. bring it at home. */ redisassert(val == null); val = vmloadobject(key); dictgetentryval(de) = val; /* clients blocked by the vm subsystem may be waiting for * this key... */ if (notify) handleclientsblockedonswappedkey(db,key); } } return val; } else { return null; }}
原文地址:redis源代码分析24–vm(中), 感谢原作者分享。
其它类似信息

推荐信息