前言模拟supervisor进程管理demo(简易实现)
没错,是造轮子!目的在于学习!
截图:
在图中自己实现了一个copy子进程的功能。如果用在amqp增减消费者时,我觉得应该会很有用。
实现1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制amqp的消费者进程管理服务。(2)模拟crontab定时服务。
知识点代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。
代码由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。
主进程代码:process.php
<?phprequire_once __dir__ . '/consumer.php';require_once __dir__ . '/streamconnection.php';require_once __dir__ . '/http.php';class process{ /** * 待启动的消费者数组 */ protected $consumers = array(); protected $childpids = array(); const ppid_file = __dir__ . '/process'; protected $serializerconsumer; public function __construct() { $this->consumers = $this->getconsumers(); } // 这里是个demo,实际可以用读取配置文件的方式。 public function getconsumers() { $consumer = new consumer([ 'program' => 'test', 'command' => '/usr/bin/php test.php', 'directory' => __dir__, 'logfile' => __dir__ . '/test.log', 'uniqid' => uniqid(), 'auto_restart' => false, ]); return [ $consumer->uniqid => $consumer, ]; } public function run() { if (empty($this->consumers)) { // consumer empty return; } if ($this->_notifymaster()) { // master alive return; } $pid = pcntl_fork(); if ($pid < 0) { exit; } elseif ($pid > 0) { exit; } if (!posix_setsid()) { exit; } $stream = new streamconnection('tcp://0.0.0.0:7865'); @cli_set_process_title('amqp master process'); // 将主进程id写入文件 file_put_contents(self::ppid_file, getmypid()); // master进程继续 while (true) { $this->init(); pcntl_signal_dispatch(); $this->waitpid(); // 如果子进程被全部回收,则主进程退出 // if (empty($this->childpids)) { // $stream->close($stream->getsocket()); // break; // } $stream->accept(function ($uniqid, $action) { $this->handle($uniqid, $action); return $this->display(); }); } } protected function init() { foreach ($this->consumers as &$c) { switch ($c->state) { case consumer::running: case consumer::stop: break; case consumer::nominal: case consumer::starting: $this->fork($c); break; case consumer::stoping: if ($c->pid && posix_kill($c->pid, sigterm)) { $this->reset($c, consumer::stop); } break; case consumer::restart: if (empty($c->pid)) { $this->fork($c); break; } if (posix_kill($c->pid, sigterm)) { $this->reset($c, consumer::stop); $this->fork($c); } break; default: break; } } } protected function reset(consumer $c, $state) { $c->pid = ''; $c->uptime = ''; $c->state = $state; $c->process = null; } protected function waitpid() { foreach ($this->childpids as $uniqid => $pid) { $result = pcntl_waitpid($pid, $status, wnohang); if ($result == $pid || $result == -1) { unset($this->childpids[$uniqid]); $c = &$this->consumers[$uniqid]; $state = pcntl_wifexited($status) ? consumer::exited : consumer::stop; $this->reset($c, $state); } } } /** * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程 */ private function _notifymaster() { $ppid = file_get_contents(self::ppid_file ); $isalive = $this->checkprocessalive($ppid); if (!$isalive) return false; return true; } public function checkprocessalive($pid) { if (empty($pid)) return false; $pidinfo = `ps co pid {$pid} | xargs`; $pidinfo = trim($pidinfo); $pattern = /.*?pid.*?(\d+).*?/; preg_match($pattern, $pidinfo, $matches); return empty($matches) ? false : ($matches[1] == $pid ? true : false); } /** * fork一个新的子进程 */ protected function fork(consumer $c) { $descriptorspec = [2 => ['file', $c->logfile, 'a'],]; $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory); if ($process) { $ret = proc_get_status($process); if ($ret['running']) { $c->state = consumer::running; $c->pid = $ret['pid']; $c->process = $process; $c->uptime = date('m-d h:i'); $this->childpids[$c->uniqid] = $ret['pid']; } else { $c->state = consumer::exited; proc_close($process); } } else { $c->state = consumer::error; } return $c; } public function display() { $location = 'http://127.0.0.1:7865'; $basepath = http::$basepath; $scriptname = isset($_server['script_name']) && !empty($_server['script_name']) && $_server['script_name'] != '/' ? $_server['script_name'] : '/index.php'; if ($scriptname == '/index.html') { return http::status_301($location); } $sourcepath = $basepath . $scriptname; if (!is_file($sourcepath)) { return http::status_404(); } ob_start(); include $sourcepath; $response = ob_get_contents(); ob_clean(); return http::status_200($response); } public function handle($uniqid, $action) { if (!empty($uniqid) && !isset($this->consumers[$uniqid])) { return; } switch ($action) { case 'refresh': break; case 'restartall': $this->killall(true); break; case 'stopall': $this->killall(); break; case 'stop': $c = &$this->consumers[$uniqid]; if ($c->state != consumer::running) break; $c->state = consumer::stoping; break; case 'start': $c = &$this->consumers[$uniqid]; if ($c->state == consumer::running) break; $c->state = consumer::starting; break; case 'restart': $c = &$this->consumers[$uniqid]; $c->state = consumer::restart; break; case 'copy': $c = $this->consumers[$uniqid]; $newc = clone $c; $newc->uniqid = uniqid('c'); $newc->state = consumer::nominal; $newc->pid = ''; $this->consumers[$newc->uniqid] = $newc; break; default: break; } } protected function killall($restart = false) { foreach ($this->consumers as &$c) { $c->state = $restart ? consumer::restart : consumer::stoping; } }}$cli = new process();$cli->run();
consumer消费者对象
<?phprequire_once __dir__ . '/baseobject.php';class consumer extends baseobject{ /** 开启多少个消费者 */ public $numprocs = 1; /** 当前配置的唯一标志 */ public $program; /** 执行的命令 */ public $command; /** 当前工作的目录 */ public $directory; /** 通过 $qos $queuename $duplicate 生成的 $queue */ public $queue; /** 程序执行日志记录 */ public $logfile = ''; /** 消费进程的唯一id */ public $uniqid; /** 进程idpid */ public $pid; /** 进程状态 */ public $state = self::nominal; /** 自启动 */ public $auto_restart = false; public $process; /** 启动时间 */ public $uptime; const running = 'running'; const stop = 'stoped'; const nominal = 'nominal'; const restart = 'restart'; const stoping = 'stoping'; const starting = 'stating'; const error = 'error'; const blocked = 'blocked'; const exited = 'exited'; const fatel = 'fatel';}
stream相关代码:streamconnection.php
<?phpclass streamconnection{ protected $socket; protected $timeout = 2; //s protected $client; public function __construct($host) { $this->socket = $this->connect($host); } public function connect($host) { $socket = stream_socket_server($host, $errno, $errstr); if (!$socket) { exit('stream error'); } stream_set_timeout($socket, $this->timeout); stream_set_chunk_size($socket, 1024); stream_set_blocking($socket, false); $this->client = [$socket]; return $socket; } public function accept(closure $callback) { $read = $this->client; if (stream_select($read, $write, $except, 1) < 1) return; if (in_array($this->socket, $read)) { $cs = stream_socket_accept($this->socket); $this->client[] = $cs; } foreach ($read as $s) { if ($s == $this->socket) continue; $header = fread($s, 1024); if (empty($header)) { $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); continue; } http::parse_http($header); $uniqid = isset($_get['uniqid']) ? $_get['uniqid'] : ''; $action = isset($_get['action']) ? $_get['action'] : ''; $response = $callback($uniqid, $action); $this->write($s, $response); $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); } } public function write($socket, $response) { $ret = fwrite($socket, $response, strlen($response)); } public function close($socket) { $flag = fclose($socket); } public function getsocket() { return $this->socket; }}
http响应代码:http.php
<?phpclass http{ public static $basepath = __dir__ . '/views'; public static $max_age = 120; //秒 /* * 函数: parse_http * 描述: 解析http协议 */ public static function parse_http($http) { // 初始化 $_post = $_get = $_cookie = $_request = $_session = $_files = array(); $globals['http_raw_post_data'] = ''; // 需要设置的变量名 $_server = array( 'query_string' => '', 'request_method' => '', 'request_uri' => '', 'server_protocol' => '', 'server_software' => '', 'server_name' => '', 'http_host' => '', 'http_user_agent' => '', 'http_accept' => '', 'http_accept_language' => '', 'http_accept_encoding' => '', 'http_cookie' => '', 'http_connection' => '', 'remote_addr' => '', 'remote_port' => '0', 'script_name' => '', 'http_referer' => '', 'content_type' => '', 'http_if_none_match' => '', ); // 将header分割成数组 list($http_header, $http_body) = explode(\r\n\r\n, $http, 2); $header_data = explode(\r\n, $http_header); list($_server['request_method'], $_server['request_uri'], $_server['server_protocol']) = explode(' ', $header_data[0]); unset($header_data[0]); foreach ($header_data as $content) { // \r\n\r\n if (empty($content)) { continue; } list($key, $value) = explode(':', $content, 2); $key = strtolower($key); $value = trim($value); switch ($key) { case 'host': $_server['http_host'] = $value; $tmp = explode(':', $value); $_server['server_name'] = $tmp[0]; if (isset($tmp[1])) { $_server['server_port'] = $tmp[1]; } break; case 'cookie': $_server['http_cookie'] = $value; parse_str(str_replace('; ', '&', $_server['http_cookie']), $_cookie); break; case 'user-agent': $_server['http_user_agent'] = $value; break; case 'accept': $_server['http_accept'] = $value; break; case 'accept-language': $_server['http_accept_language'] = $value; break; case 'accept-encoding': $_server['http_accept_encoding'] = $value; break; case 'connection': $_server['http_connection'] = $value; break; case 'referer': $_server['http_referer'] = $value; break; case 'if-modified-since': $_server['http_if_modified_since'] = $value; break; case 'if-none-match': $_server['http_if_none_match'] = $value; break; case 'content-type': if (!preg_match('/boundary=?(\s+)?/', $value, $match)) { $_server['content_type'] = $value; } else { $_server['content_type'] = 'multipart/form-data'; $http_post_boundary = '--' . $match[1]; } break; } } // script_name $_server['script_name'] = parse_url($_server['request_uri'], php_url_path); // query_string $_server['query_string'] = parse_url($_server['request_uri'], php_url_query); if ($_server['query_string']) { // $get parse_str($_server['query_string'], $_get); } else { $_server['query_string'] = ''; } // request $_request = array_merge($_get, $_post); return array('get' => $_get, 'post' => $_post, 'cookie' => $_cookie, 'server' => $_server, 'files' => $_files); } public static function status_404() { return <<
在当前目录下的视图页面:
|- process.php
|- http.php
|- streamconnection.php
|- consumer.php
|- baseobject.php
|- views/
更多编程相关知识,请访问:编程教学!!
以上就是php模拟supervisor的进程管理的详细内容。