您好,欢迎访问一九零五行业门户网

如何进行gunicorn Arbiter 源码解析

如前文所述,arbiter是gunicorn master进程的核心。arbiter主要负责管理worker进程,包括启动、监控、杀掉worker进程;同时,arbiter在某些信号发生的时候还可以热更新(reload)app应用,或者在线升级gunicorn。arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn。
arbiter主要有以下方法:
setup:
    处理配置项,最重要的是worker数量和worker工作模型
init_signal:
    注册信号处理函数
handle_xxx:
    各个信号具体的处理函数
kill_worker,kill_workers:
    向worker进程发信号
spawn_worker, spawn_workers:
    fork出新的worker进程
murder_workers:
    杀掉一段时间内未响应的worker进程
manage_workers:
    根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程
reexec:
    接收到信号sigusr2调用,在线升级gunicorn
reload:
    接收到信号sighup调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程
sleep:
    在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒
wakeup:
    通过向管道写消息,唤醒进程
run:
    主循环
arbiter真正被其他代码(application)调用的函数只有__init__和run方法,在一句代码里:
    arbiter(self).run()
上面代码中的self即为application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码
def run()    self.init_signal()    self.listeners = create_sockets(self.cfg, self.log)    self.manage_workers()    while true:        if no signal in sig_queue            self.sleep()        else:            handle_signal()
关于fork子进程
fork子进程的代码在 spawn_worker, 源码如下:
 arbiter.spawn_worker
主要流程:
    (1)加载worker_class并实例化(默认为同步模型 syncworker)
    (2)父进程(master进程)fork之后return,之后的逻辑都在子进程中运行
    (3)调用worker.init_process 进入循环,新航道雅思培训的所有工作都在这个循环中
    (4)循环结束之后,调用sys.exit(0)
    (5)最后,在finally中,记录worker进程的退出
下面是我自己写的一点代码,把主要的fork流程简化了一下
1 # prefork.py 2 import sys 3 import socket 4 import select 5 import os 6 import time 7   8 def do_sub_process(): 9     pid = os.fork()10     if pid < 0:11 print 'fork error'12 sys.exit(-1)13 elif pid > 0:14         print 'fork sub process %d'  % pid15         return16  17     # must be child process18     time.sleep(1)19     print 'sub process will exit', os.getpid(), os.getppid()20     sys.exit(0)21  22 def main():23     sub_num = 224     for i in range(sub_num):25         do_sub_process()26     time.sleep(10)27     print 'main process will exit', os.getpid()28  29 if __name__ == '__main__':30     main()
在测试环境下输出:
fork sub process 9601
fork sub process 9602
sub process will exit 9601 9600
sub process will exit 9602 9600
main process will exit 9600
需要注意的是第20行调用了sys.exit, 保证子进程的结束,否则会继续main函数中for循环,以及之后的逻辑。注释掉第19行重新运行,看输出就明白了。
关于kill子进程
master进程要kill worker进程就很简单了,直接发信号,源码如下:
1     def kill_worker(self, pid, sig): 2         \ 3         kill a worker 4  5         :attr pid: int, worker pid 6         :attr sig: `signal.sig*` value 7           8         try: 9             os.kill(pid, sig)10         except oserror as e:11             if e.errno == errno.esrch:12                 try:13                     worker = self.workers.pop(pid)14                     worker.tmp.close()15                     self.cfg.worker_exit(self, worker)16                     return17                 except (keyerror, oserror):18                     return19             raise
关于sleep与wakeup
我们再来看看arbiter的sleep和wakeup。arbiter在没有信号需要处理的时候会sleep,当然,不是真正调用time.sleep,否则信号来了也不能第一时间处理。这里得实现比较巧妙,利用了管道和select的timeout。看代码就知道了
def sleep(self):        \        sleep until pipe is readable or we timeout.        a readable pipe means a signal occurred.                    ready = select.select([self.pipe[0]], [], [], 1.0) # self.pipe = os.pipe()            if not ready[0]:                 return            while os.read(self.pipe[0], 1):                pass
代码里面的注释写得非常清楚,要么pipe可读立即返回,要么等待超时。管道可读是因为有信号发生。这里看看pipe函数
os.pipe()
create a pipe. return a pair of file descriptors (r,w) usable for reading and writing, respectively.
那我们看一下什么时候管道可读:肯定是往管道写入的东西,这就是wakeup函数的功能
        def wakeup(self):                        wake up the arbiter by writing to the pipe                        os.write(self.pipe[1], b'.')
最后附上arbiter的信号处理:
退出,int:快速关闭
term: 优雅关机。等待工作人员完成其当前请求,直到超时。
hup:重新加载配置,用新配置启动新的工作进程,并优雅地关闭旧的工作进程。如果应用程序未预加载(使用--preload选项),gunicorn也将加载新版本。
ttin:将进程数增加一个
ttou:将进程数减少一个
usr1:重新打开日志文件
usr2:在飞行中升级gunicorn。应使用单独的术语信号终止旧进程。此信号也可用于使用预加载应用程序的新版本。
绞盘:当gunicorn被守护时,优雅地关闭工作进程。
以上就是如何进行gunicorn arbiter 源码解析的详细内容。
其它类似信息

推荐信息