您好,欢迎访问一九零五行业门户网

php自己实现memcached的队列类

add('1asdf'); * $obj->getqueuelength(); * $obj->read(11); * $obj->get(8); */class memcachequeue{ public static $client; //memcache客户端连接 public $access; //队列是否可更新 private $currentside; //当前轮值的队列面:a/b private $lastside; //上一轮值的队列面:a/b private $sideahead; //a面队首值 private $sideatail; //a面队尾值 private $sidebhead; //b面队首值 private $sidebtail; //b面队尾值 private $currenthead; //当前队首值 private $currenttail; //当前队尾值 private $lasthead; //上轮队首值 private $lasttail; //上轮队尾值 private $expire; //过期时间,秒,1~2592000,即30天内;0为永不过期 private $sleeptime; //等待解锁时间,微秒 private $queuename; //队列名称,唯一值 private $retrynum; //重试次数,= 10 * 理论并发数 const maxnum = 2000; //(单面)最大队列数,建议上限10k const head_key = '_lkkqueuehead_'; //队列首kye const tail_key = '_lkkqueuetail_'; //队列尾key const valu_key = '_lkkqueuevalu_'; //队列值key const lock_key = '_lkkqueuelock_'; //队列锁key const side_key = '_lkkqueueside_'; //轮值面key /* * 构造函数 * @param [config] array memcache服务器参数 * @param [queuename] string 队列名称 * @param [expire] string 过期时间 * @return null */ public function __construct($queuename ='',$expire='',$config =''){ if(empty($config)){ self::$client = memcache_pconnect('localhost',11211); }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211') self::$client = memcache_pconnect($config['host'],$config['port']); }elseif(is_string($config)){//127.0.0.1:11211 $tmp = explode(':',$config); $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1'; $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211'; self::$client = memcache_pconnect($conf['host'],$conf['port']); } if(!self::$client) return false; ignore_user_abort(true);//当客户断开连接,允许继续执行 set_time_limit(0);//取消脚本执行延时上限 $this->access = false; $this->sleeptime = 1000; $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire; $this->expire = $expire; $this->queuename = $queuename; $this->retrynum = 10000; $side = memcache_add(self::$client, $queuename . self::side_key, 'a',false, $expire); $this->getheadntail($queuename); if(!isset($this->sideahead) empty($this->sideahead)) $this->sideahead = 0; if(!isset($this->sideatail) empty($this->sideatail)) $this->sideatail = 0; if(!isset($this->sidebhead) empty($this->sidebhead)) $this->sidebhead = 0; if(!isset($this->sidebhead) empty($this->sidebhead)) $this->sidebhead = 0; } /* * 获取队列首尾值 * @param [queuename] string 队列名称 * @return null */ private function getheadntail($queuename){ $this->sideahead = (int)memcache_get(self::$client, $queuename.'a'. self::head_key); $this->sideatail = (int)memcache_get(self::$client, $queuename.'a'. self::tail_key); $this->sidebhead = (int)memcache_get(self::$client, $queuename.'b'. self::head_key); $this->sidebtail = (int)memcache_get(self::$client, $queuename.'b'. self::tail_key); } /* * 获取当前轮值的队列面 * @return string 队列面名称 */ public function getcurrentside(){ $currentside = memcache_get(self::$client, $this->queuename . self::side_key); if($currentside == 'a'){ $this->currentside = 'a'; $this->lastside = 'b'; $this->currenthead = $this->sideahead; $this->currenttail = $this->sideatail; $this->lasthead = $this->sidebhead; $this->lasttail = $this->sidebtail; }else{ $this->currentside = 'b'; $this->lastside = 'a'; $this->currenthead = $this->sidebhead; $this->currenttail = $this->sidebtail; $this->lasthead = $this->sideahead; $this->lasttail = $this->sideatail; } return $this->currentside; } /* * 队列加锁 * @return boolean */ private function getlock(){ if($this->access === false){ while(!memcache_add(self::$client, $this->queuename .self::lock_key, 1, false, $this->expire) ){ usleep($this->sleeptime); @$i++; if($i > $this->retrynum){//尝试等待n次 return false; break; } } return $this->access = true; } return false; } /* * 队列解锁 * @return null */ private function unlock(){ memcache_delete(self::$client, $this->queuename .self::lock_key); $this->access = false; } /* * 添加数据 * @param [data] 要存储的值 * @return boolean */ public function add($data){ $result = false; if(!$this->getlock()){ return $result; } $this->getheadntail($this->queuename); $this->getcurrentside(); if($this->isfull()){ $this->unlock(); return false; } if($this->currenttail queuename .$this->currentside . self::valu_key . $this->currenttail; if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){ $this->changetail(); $result = true; } }else{//当前队列已满,更换轮值面 $this->unlock(); $this->changecurrentside(); return $this->add($data); } $this->unlock(); return $result; } /* * 取出数据 * @param [length] int 数据的长度 * @return array */ public function get($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::maxnum * 2;//默认读取所有 if(!$this->getlock()) return false; if($this->isempty()){ $this->unlock(); return false; } $keyarray = $this->getkeyarray($length); $lastkey = $keyarray['lastkey']; $currentkey = $keyarray['currentkey']; $keys = $keyarray['keys']; $this->changehead($this->lastside,$lastkey); $this->changehead($this->currentside,$currentkey); $data = @memcache_get(self::$client, $keys); foreach($keys as $v){//取出之后删除 @memcache_delete(self::$client, $v, 0); } $this->unlock(); return $data; } /* * 读取数据 * @param [length] int 数据的长度 * @return array */ public function read($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::maxnum * 2;//默认读取所有 $keyarray = $this->getkeyarray($length); $data = @memcache_get(self::$client, $keyarray['keys']); return $data; } /* * 获取队列某段长度的key数组 * @param [length] int 队列长度 * @return array */ private function getkeyarray($length){ $result = array('keys'=>array(),'lastkey'=>array(),'currentkey'=>array()); $this->getheadntail($this->queuename); $this->getcurrentside(); if(empty($length)) return $result; //先取上一面的key $i = $result['lastkey'] = 0; for($i=0;$ilasthead + $i; if($result['lastkey'] >= $this->lasttail) break; $result['keys'][] = $this->queuename .$this->lastside . self::valu_key . $result['lastkey']; } //再取当前面的key $j = $length - $i; $k = $result['currentkey'] = 0; for($k=0;$kcurrenthead + $k; if($result['currentkey'] >= $this->currenttail) break; $result['keys'][] = $this->queuename .$this->currentside . self::valu_key . $result['currentkey']; } return $result; } /* * 更新当前轮值面队列尾的值 * @return null */ private function changetail(){ $tail_key = $this->queuename .$this->currentside . self::tail_key; memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false; //memcache_increment(self::$client, $tail_key, 1);//队列尾+1 $v = memcache_get(self::$client, $tail_key) +1; memcache_set(self::$client, $tail_key,$v,false,$this->expire); } /* * 更新队列首的值 * @param [side] string 要更新的面 * @param [headvalue] int 队列首的值 * @return null */ private function changehead($side,$headvalue){ if($headvalue queuename .$side . self::head_key; $tail_key = $this->queuename .$side . self::tail_key; $sidetail = memcache_get(self::$client, $tail_key); if($headvalue expire); }elseif($headvalue >= $sidetail){ $this->resetside($side); } } /* * 重置队列面,即将该队列面的队首、队尾值置为0 * @param [side] string 要重置的面 * @return null */ private function resetside($side){ $head_key = $this->queuename .$side . self::head_key; $tail_key = $this->queuename .$side . self::tail_key; memcache_set(self::$client, $head_key,0,false,$this->expire); memcache_set(self::$client, $tail_key,0,false,$this->expire); } /* * 改变当前轮值队列面 * @return string */ private function changecurrentside(){ $currentside = memcache_get(self::$client, $this->queuename . self::side_key); if($currentside == 'a'){ memcache_set(self::$client, $this->queuename . self::side_key,'b',false,$this->expire); $this->currentside = 'b'; }else{ memcache_set(self::$client, $this->queuename . self::side_key,'a',false,$this->expire); $this->currentside = 'a'; } return $this->currentside; } /* * 检查当前队列是否已满 * @return boolean */ public function isfull(){ $result = false; if($this->sideatail == self::maxnum && $this->sidebtail == self::maxnum){ $result = true; } return $result; } /* * 检查当前队列是否为空 * @return boolean */ public function isempty(){ $result = true; if($this->sideatail > 0 $this->sidebtail > 0){ $result = false; } return $result; } /* * 获取当前队列的长度 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度 * @return int */ public function getqueuelength(){ $this->getheadntail($this->queuename); $this->getcurrentside(); $sidealength = $this->sideatail - $this->sideahead; $sideblength = $this->sidebtail - $this->sidebhead; $result = $sidealength + $sideblength; return $result; } /* * 清空当前队列数据,仅保留head_key、tail_key、side_key三个key * @return boolean */ public function clear(){ if(!$this->getlock()) return false; for($i=0;$iqueuename.'a'. self::valu_key .$i, 0); @memcache_delete(self::$client, $this->queuename.'b'. self::valu_key .$i, 0); } $this->unlock(); $this->resetside('a'); $this->resetside('b'); return true; } /* * 清除所有memcache缓存数据 * @return null */ public function memflush(){ memcache_flush(self::$client); }}
其它类似信息

推荐信息