您好,欢迎访问一九零五行业门户网

nginx 源码学习笔记(二十一)—— event 模块二 ——事件驱动核心ngx_process_events_and_timers

首先继续回忆下,之前子线程执行操作里面有一个未涉及的内容ngx_process_events_and_timers,今天我们就来研究下这个函数。
本篇文章来自于:http://blog.csdn.net/lengzijian/article/details/7601730
先来看一下第十九节的部分截图:
今天主要讲解的就是事件驱动函数,图中的红色部分:
[cpp] view plaincopyprint?
src/event/ngx_event.c    void  ngx_process_events_and_timers(ngx_cycle_t *cycle)  {      ngx_uint_t  flags;      ngx_msec_t  timer, delta;        if (ngx_timer_resolution) {          timer = ngx_timer_infinite;          flags = 0;        } else {          timer = ngx_event_find_timer();          flags = ngx_update_time;      }            /*     ngx_use_accept_mutex变量代表是否使用accept互斥体     默认是使用,可以通过accept_mutex off;指令关闭;     accept mutex 的作用就是避免惊群,同时实现负载均衡     */      if (ngx_use_accept_mutex) {                    /*         ngx_accept_disabled变量在ngx_event_accept函数中计算。         如果ngx_accept_disabled大于0,就表示该进程接受的链接过多,         因此放弃一次争抢accept mutex的机会,同时将自己减一。         然后,继续处理已有连接上的事件。         nginx就利用这一点实现了继承关于连接的基本负载均衡。         */          if (ngx_accept_disabled > 0) {              ngx_accept_disabled--;            } else {              /*             尝试锁accept mutex,只有成功获取锁的进程,才会将listen套接字放到epoll中。             因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在epoll_wait时,             才不会惊群现象。             */              if (ngx_trylock_accept_mutex(cycle) == ngx_error) {                  return;              }                if (ngx_accept_mutex_held) {                  /*                 如果进程获得了锁,将添加一个 ngx_post_events 标志。                 这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。                 因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁,                 导致其他进程无法获取锁,这样accept的效率就低了。                 */                  flags |= ngx_post_events;                } else {                  /*                 没有获得所得进程,当然不需要ngx_post_events标志。                 但需要设置延时多长时间,再去争抢锁。                 */                  if (timer == ngx_timer_infinite                      || timer > ngx_accept_mutex_delay)                  {                      timer = ngx_accept_mutex_delay;                  }              }          }      }        delta = ngx_current_msec;            /*接下来,epoll要开始wait事件,     ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数     这里之后会详细讲解的哦     */      (void) ngx_process_events(cycle, timer, flags);      //统计本次wait事件的耗时      delta = ngx_current_msec - delta;        ngx_log_debug1(ngx_log_debug_event, cycle->log, 0,                     timer delta: %m, delta);        /*     ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。     前文提到的ngx_post_events标志被使用后,会将所有的accept事件暂存到这个队列     */      if (ngx_posted_accept_events) {          ngx_event_process_posted(cycle, &ngx_posted_accept_events);      }      //所有accept事件处理完之后,如果持有锁的话,就释放掉。      if (ngx_accept_mutex_held) {          ngx_shmtx_unlock(&ngx_accept_mutex);      }            /*     delta是之前统计的耗时,存在毫秒级的耗时,就对所有时间的timer进行检查,     如果timeout 就从time rbtree中删除到期的timer,同时调用相应事件的handler函数处理     */      if (delta) {          ngx_event_expire_timers();      }        ngx_log_debug1(ngx_log_debug_event, cycle->log, 0,                     posted events %p, ngx_posted_events);        /*     处理普通事件(连接上获得的读写事件),     因为每个事件都有自己的handler方法,     */      if (ngx_posted_events) {          if (ngx_threaded) {              ngx_wakeup_worker_thread(cycle);            } else {              ngx_event_process_posted(cycle, &ngx_posted_events);          }      }  }  
之前有说过accept事件,其实他就是监听套接口上是否有新来的事件,下面介绍下accept时间的handler方法:
ngx_event_accept:
[cpp] view plaincopyprint?
src/event/ngx_event_accept.c    void  ngx_event_accept(ngx_event_t *ev)  {      socklen_t          socklen;      ngx_err_t          err;      ngx_log_t         *log;      ngx_socket_t       s;      ngx_event_t       *rev, *wev;      ngx_listening_t   *ls;      ngx_connection_t  *c, *lc;      ngx_event_conf_t  *ecf;      u_char             sa[ngx_sockaddrlen];            //省略部分代码        lc = ev->data;      ls = lc->listening;      ev->ready = 0;        ngx_log_debug2(ngx_log_debug_event, ev->log, 0,                     accept on %v, ready: %d, &ls->addr_text, ev->available);        do {          socklen = ngx_sockaddrlen;          //accept一个新的连接          s = accept(lc->fd, (struct sockaddr *) sa, &socklen);          //省略部分代码                    /*         accept到一个新的连接后,就重新计算ngx_accept_disabled的值,         它主要是用来做负载均衡,之前有提过。                  这里,我们可以看到他的就只方式         “总连接数的八分之一   -   剩余的连接数“         总连接指每个进程设定的最大连接数,这个数字可以再配置文件中指定。                  所以每个进程到总连接数的7/8后,ngx_accept_disabled就大于零,连接超载了                  */            ngx_accept_disabled = ngx_cycle->connection_n / 8                                - ngx_cycle->free_connection_n;                    //获取一个connection          c = ngx_get_connection(s, ev->log);            //为新的链接创建起一个memory pool          //连接关闭的时候,才释放pool            c->pool = ngx_create_pool(ls->pool_size, ev->log);          if (c->pool == null) {              ngx_close_accepted_connection(c);              return;          }            c->sockaddr = ngx_palloc(c->pool, socklen);          if (c->sockaddr == null) {              ngx_close_accepted_connection(c);              return;          }            ngx_memcpy(c->sockaddr, sa, socklen);            log = ngx_palloc(c->pool, sizeof(ngx_log_t));          if (log == null) {              ngx_close_accepted_connection(c);              return;          }            /* set a blocking mode for aio and non-blocking mode for others */            if (ngx_inherited_nonblocking) {              if (ngx_event_flags & ngx_use_aio_event) {                  if (ngx_blocking(s) == -1) {                      ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,                                    ngx_blocking_n  failed);                      ngx_close_accepted_connection(c);                      return;                  }              }            } else {              //我们使用epoll模型,这里我们设置连接为nonblocking              if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) {                  if (ngx_nonblocking(s) == -1) {                      ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,                                    ngx_nonblocking_n  failed);                      ngx_close_accepted_connection(c);                      return;                  }              }          }            *log = ls->log;          //初始化新的连接          c->recv = ngx_recv;          c->send = ngx_send;          c->recv_chain = ngx_recv_chain;          c->send_chain = ngx_send_chain;            c->log = log;          c->pool->log = log;            c->socklen = socklen;          c->listening = ls;          c->local_sockaddr = ls->sockaddr;            c->unexpected_eof = 1;    #if (ngx_have_unix_domain)          if (c->sockaddr->sa_family == af_unix) {              c->tcp_nopush = ngx_tcp_nopush_disabled;              c->tcp_nodelay = ngx_tcp_nodelay_disabled;  #if (ngx_solaris)              /* solaris's sendfilev() supports af_nca, af_inet, and af_inet6 */              c->sendfile = 0;  #endif          }  #endif            rev = c->read;          wev = c->write;            wev->ready = 1;            if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) {              /* rtsig, aio, iocp */              rev->ready = 1;          }            if (ev->deferred_accept) {              rev->ready = 1;  #if (ngx_have_kqueue)              rev->available = 1;  #endif          }            rev->log = log;          wev->log = log;            /*          * todo: mt: - ngx_atomic_fetch_add()          *             or protection by critical section or light mutex          *          * todo: mp: - allocated in a shared memory          *           - ngx_atomic_fetch_add()          *             or protection by critical section or light mutex          */            c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);                    if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) {              if (ngx_add_conn(c) == ngx_error) {                  ngx_close_accepted_connection(c);                  return;              }          }            log->data = null;          log->handler = null;                    /*         这里listen handler很重要,它将完成新连接的最后初始化工作,         同时将accept到的新的连接放入epoll中;挂在这个handler上的函数,         就是ngx_http_init_connection 在之后http模块中在详细介绍         */          ls->handler(c);            if (ngx_event_flags & ngx_use_kqueue_event) {              ev->available--;          }        } while (ev->available);  }  
accpt事件的handler方法也就是如此了。之后就是每个连接的读写事件handler方法,这一部分会直接将我们引入http模块,我们还不急,还要学习下nginx经典模块epoll。
以上就介绍了nginx 源码学习笔记(二十一)—— event 模块二 ——事件驱动核心ngx_process_events_and_timers,包括了队列方面的内容,希望对php教程有兴趣的朋友有所帮助。
其它类似信息

推荐信息