您好,欢迎访问一九零五行业门户网

PHP模拟supervisor的进程管理

推荐:《php视频教程》
前言模拟supervisor进程管理demo(简易实现)
没错,是造轮子!目的在于学习!
截图:
在图中自己实现了一个copy子进程的功能。如果用在amqp增减消费者时,我觉得应该会很有用。
实现1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制amqp的消费者进程管理服务。(2)模拟crontab定时服务。
知识点代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。
代码由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。
主进程代码:process.php
<?phprequire_once __dir__ . '/consumer.php';require_once __dir__ . '/streamconnection.php';require_once __dir__ . '/http.php';class process{ /** * 待启动的消费者数组 */ protected $consumers = array(); protected $childpids = array(); const ppid_file = __dir__ . '/process'; protected $serializerconsumer; public function __construct() { $this->consumers = $this->getconsumers();    }    // 这里是个demo,实际可以用读取配置文件的方式。    public function getconsumers()    {        $consumer = new consumer([            'program' => 'test',            'command' => '/usr/bin/php test.php',            'directory' => __dir__,            'logfile' => __dir__ . '/test.log',            'uniqid' => uniqid(),            'auto_restart' => false,        ]);        return [            $consumer->uniqid => $consumer,        ];    }    public function run()    {        if (empty($this->consumers)) {            // consumer empty            return;        }        if ($this->_notifymaster()) {            // master alive            return;        }        $pid = pcntl_fork();        if ($pid < 0) { exit; } elseif ($pid > 0) {            exit;        }        if (!posix_setsid()) {            exit;        }        $stream = new streamconnection('tcp://0.0.0.0:7865');        @cli_set_process_title('amqp master process');        // 将主进程id写入文件        file_put_contents(self::ppid_file, getmypid());        // master进程继续        while (true) {            $this->init();            pcntl_signal_dispatch();            $this->waitpid();            // 如果子进程被全部回收,则主进程退出            // if (empty($this->childpids)) {            //     $stream->close($stream->getsocket());            //     break;            // }            $stream->accept(function ($uniqid, $action) {                $this->handle($uniqid, $action);                return $this->display();            });        }    }    protected function init()    {        foreach ($this->consumers as &$c) {            switch ($c->state) {                case consumer::running:                case consumer::stop:                    break;                case consumer::nominal:                case consumer::starting:                    $this->fork($c);                    break;                case consumer::stoping:                    if ($c->pid && posix_kill($c->pid, sigterm)) {                        $this->reset($c, consumer::stop);                    }                    break;                case consumer::restart:                    if (empty($c->pid)) {                        $this->fork($c);                        break;                    }                    if (posix_kill($c->pid, sigterm)) {                        $this->reset($c, consumer::stop);                        $this->fork($c);                    }                    break;                default:                    break;            }        }    }    protected function reset(consumer $c, $state)    {        $c->pid = '';        $c->uptime = '';        $c->state = $state;        $c->process = null;    }    protected function waitpid()    {        foreach ($this->childpids as $uniqid => $pid) {            $result = pcntl_waitpid($pid, $status, wnohang);            if ($result == $pid || $result == -1) {                unset($this->childpids[$uniqid]);                $c = &$this->consumers[$uniqid];                $state = pcntl_wifexited($status) ? consumer::exited : consumer::stop;                $this->reset($c, $state);            }        }    }    /**     * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程     */    private function _notifymaster()    {        $ppid = file_get_contents(self::ppid_file );        $isalive = $this->checkprocessalive($ppid);        if (!$isalive) return false;        return true;    }    public function checkprocessalive($pid)    {        if (empty($pid)) return false;        $pidinfo = `ps co pid {$pid} | xargs`;        $pidinfo = trim($pidinfo);        $pattern = /.*?pid.*?(\d+).*?/;        preg_match($pattern, $pidinfo, $matches);        return empty($matches) ? false : ($matches[1] == $pid ? true : false);    }    /**     * fork一个新的子进程     */    protected function fork(consumer $c)    {        $descriptorspec = [2 => ['file', $c->logfile, 'a'],];        $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);        if ($process) {            $ret = proc_get_status($process);            if ($ret['running']) {                $c->state = consumer::running;                $c->pid = $ret['pid'];                $c->process = $process;                $c->uptime = date('m-d h:i');                $this->childpids[$c->uniqid] = $ret['pid'];            } else {                $c->state = consumer::exited;                proc_close($process);            }        } else {            $c->state = consumer::error;        }        return $c;    }    public function display()    {        $location = 'http://127.0.0.1:7865';        $basepath = http::$basepath;        $scriptname = isset($_server['script_name']) &&            !empty($_server['script_name']) &&            $_server['script_name'] != '/' ? $_server['script_name'] : '/index.php';        if ($scriptname == '/index.html') {            return http::status_301($location);        }        $sourcepath = $basepath . $scriptname;        if (!is_file($sourcepath)) {            return http::status_404();        }        ob_start();        include $sourcepath;        $response = ob_get_contents();        ob_clean();        return http::status_200($response);    }    public function handle($uniqid, $action)    {        if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {            return;        }        switch ($action) {            case 'refresh':                break;            case 'restartall':                $this->killall(true);                break;            case 'stopall':                $this->killall();                break;            case 'stop':                $c = &$this->consumers[$uniqid];                if ($c->state != consumer::running) break;                $c->state = consumer::stoping;                break;            case 'start':                $c = &$this->consumers[$uniqid];                if ($c->state == consumer::running) break;                $c->state = consumer::starting;                break;            case 'restart':                $c = &$this->consumers[$uniqid];                $c->state = consumer::restart;                break;            case 'copy':                $c = $this->consumers[$uniqid];                $newc = clone $c;                $newc->uniqid = uniqid('c');                $newc->state = consumer::nominal;                $newc->pid = '';                $this->consumers[$newc->uniqid] = $newc;                break;            default:                break;        }    }    protected function killall($restart = false)    {        foreach ($this->consumers as &$c) {            $c->state = $restart ? consumer::restart : consumer::stoping;        }    }}$cli = new process();$cli->run();
consumer消费者对象
<?phprequire_once __dir__ . '/baseobject.php';class consumer extends baseobject{ /** 开启多少个消费者 */ public $numprocs = 1; /** 当前配置的唯一标志 */ public $program; /** 执行的命令 */ public $command; /** 当前工作的目录 */ public $directory; /** 通过 $qos $queuename $duplicate 生成的 $queue */ public $queue; /** 程序执行日志记录 */ public $logfile = ''; /** 消费进程的唯一id */ public $uniqid; /** 进程idpid */ public $pid; /** 进程状态 */ public $state = self::nominal; /** 自启动 */ public $auto_restart = false; public $process; /** 启动时间 */ public $uptime; const running = 'running'; const stop = 'stoped'; const nominal = 'nominal'; const restart = 'restart'; const stoping = 'stoping'; const starting = 'stating'; const error = 'error'; const blocked = 'blocked'; const exited = 'exited'; const fatel = 'fatel';}
stream相关代码:streamconnection.php
<?phpclass streamconnection{ protected $socket; protected $timeout = 2; //s protected $client; public function __construct($host) { $this->socket = $this->connect($host);    }    public function connect($host)    {        $socket = stream_socket_server($host, $errno, $errstr);        if (!$socket) {            exit('stream error');        }        stream_set_timeout($socket, $this->timeout);        stream_set_chunk_size($socket, 1024);        stream_set_blocking($socket, false);        $this->client = [$socket];        return $socket;    }    public function accept(closure $callback)    {        $read = $this->client;        if (stream_select($read, $write, $except, 1) < 1) return; if (in_array($this->socket, $read)) {            $cs = stream_socket_accept($this->socket);            $this->client[] = $cs;        }        foreach ($read as $s) {            if ($s == $this->socket) continue;            $header = fread($s, 1024);            if (empty($header)) {                $index = array_search($s, $this->client);                if ($index)                    unset($this->client[$index]);                $this->close($s);                continue;            }            http::parse_http($header);            $uniqid = isset($_get['uniqid']) ? $_get['uniqid'] : '';            $action = isset($_get['action']) ? $_get['action'] : '';            $response = $callback($uniqid, $action);            $this->write($s, $response);            $index = array_search($s, $this->client);            if ($index)                unset($this->client[$index]);            $this->close($s);        }    }    public function write($socket, $response)    {        $ret = fwrite($socket, $response, strlen($response));    }    public function close($socket)    {        $flag = fclose($socket);    }    public function getsocket()    {        return $this->socket;    }}
http响应代码:http.php
<?phpclass http{ public static $basepath = __dir__ . '/views'; public static $max_age = 120; //秒 /* * 函数: parse_http * 描述: 解析http协议 */ public static function parse_http($http) { // 初始化 $_post = $_get = $_cookie = $_request = $_session = $_files = array(); $globals['http_raw_post_data'] = ''; // 需要设置的变量名 $_server = array( 'query_string' => '',            'request_method' => '',            'request_uri' => '',            'server_protocol' => '',            'server_software' => '',            'server_name' => '',            'http_host' => '',            'http_user_agent' => '',            'http_accept' => '',            'http_accept_language' => '',            'http_accept_encoding' => '',            'http_cookie' => '',            'http_connection' => '',            'remote_addr' => '',            'remote_port' => '0',            'script_name' => '',            'http_referer' => '',            'content_type' => '',            'http_if_none_match' => '',        );        // 将header分割成数组        list($http_header, $http_body) = explode(\r\n\r\n, $http, 2);        $header_data = explode(\r\n, $http_header);        list($_server['request_method'], $_server['request_uri'], $_server['server_protocol']) = explode(' ', $header_data[0]);        unset($header_data[0]);        foreach ($header_data as $content) {            // \r\n\r\n            if (empty($content)) {                continue;            }            list($key, $value) = explode(':', $content, 2);            $key = strtolower($key);            $value = trim($value);            switch ($key) {                case 'host':                    $_server['http_host'] = $value;                    $tmp = explode(':', $value);                    $_server['server_name'] = $tmp[0];                    if (isset($tmp[1])) {                        $_server['server_port'] = $tmp[1];                    }                    break;                case 'cookie':                    $_server['http_cookie'] = $value;                    parse_str(str_replace('; ', '&', $_server['http_cookie']), $_cookie);                    break;                case 'user-agent':                    $_server['http_user_agent'] = $value;                    break;                case 'accept':                    $_server['http_accept'] = $value;                    break;                case 'accept-language':                    $_server['http_accept_language'] = $value;                    break;                case 'accept-encoding':                    $_server['http_accept_encoding'] = $value;                    break;                case 'connection':                    $_server['http_connection'] = $value;                    break;                case 'referer':                    $_server['http_referer'] = $value;                    break;                case 'if-modified-since':                    $_server['http_if_modified_since'] = $value;                    break;                case 'if-none-match':                    $_server['http_if_none_match'] = $value;                    break;                case 'content-type':                    if (!preg_match('/boundary=?(\s+)?/', $value, $match)) {                        $_server['content_type'] = $value;                    } else {                        $_server['content_type'] = 'multipart/form-data';                        $http_post_boundary = '--' . $match[1];                    }                    break;            }        }        // script_name        $_server['script_name'] = parse_url($_server['request_uri'], php_url_path);        // query_string        $_server['query_string'] = parse_url($_server['request_uri'], php_url_query);        if ($_server['query_string']) {            // $get            parse_str($_server['query_string'], $_get);        } else {            $_server['query_string'] = '';        }        // request        $_request = array_merge($_get, $_post);        return array('get' => $_get, 'post' => $_post, 'cookie' => $_cookie, 'server' => $_server, 'files' => $_files);    }    public static function status_404()    {        return << 待执行的脚本:test.php
在当前目录下的视图页面:
|- process.php
|- http.php
|- streamconnection.php
|- consumer.php
|- baseobject.php
|- views/
更多编程相关知识,请访问:编程教学!!
以上就是php模拟supervisor的进程管理的详细内容。
其它类似信息

推荐信息