本文共 20512 字,大约阅读时间需要 68 分钟。
nginx源码分析
nginx-1.11.1参考书籍《深入理解nginx模块开发与架构解析》
Nginx的master与worker工作模式
在生成环境中的Nginx启动模式基本都是以master/worker为主进行启动运行,通过master/worker的工作方式可以利用多核系统的并发处理能力,master主要就是负责与worker进程进行通信,控制并负载每个worker进程的连接处理以达到worker进程的负载均衡,本文就开始分析一下该模式
master的启动过程
信号相关初始化
在Nginx中的工作方式,进程间的通信基于信号的较多,通过信号来实现相关进程的管理工作,在初始化的过程中,有注册了相关信号的初始化。
ngx_signal_t signals[] = { { ngx_signal_value(NGX_RECONFIGURE_SIGNAL), "SIG" ngx_value(NGX_RECONFIGURE_SIGNAL), "reload", ngx_signal_handler }, // 重新加载信号处理 { ngx_signal_value(NGX_REOPEN_SIGNAL), "SIG" ngx_value(NGX_REOPEN_SIGNAL), "reopen", ngx_signal_handler }, // 日志重新打开信号处理 { ngx_signal_value(NGX_NOACCEPT_SIGNAL), "SIG" ngx_value(NGX_NOACCEPT_SIGNAL), "", ngx_signal_handler }, { ngx_signal_value(NGX_TERMINATE_SIGNAL), "SIG" ngx_value(NGX_TERMINATE_SIGNAL), // 停止信号 "stop", ngx_signal_handler }, { ngx_signal_value(NGX_SHUTDOWN_SIGNAL), "SIG" ngx_value(NGX_SHUTDOWN_SIGNAL), // 退出信号 "quit", ngx_signal_handler }, { ngx_signal_value(NGX_CHANGEBIN_SIGNAL), "SIG" ngx_value(NGX_CHANGEBIN_SIGNAL), "", ngx_signal_handler }, { SIGALRM, "SIGALRM", "", ngx_signal_handler }, { SIGINT, "SIGINT", "", ngx_signal_handler }, { SIGIO, "SIGIO", "", ngx_signal_handler }, { SIGCHLD, "SIGCHLD", "", ngx_signal_handler }, { SIGSYS, "SIGSYS, SIG_IGN", "", SIG_IGN }, { SIGPIPE, "SIGPIPE, SIG_IGN", "", SIG_IGN }, { 0, NULL, "", NULL }};ngx_int_tngx_init_signals(ngx_log_t *log) // 注册相关信号处理函数{ ngx_signal_t *sig; struct sigaction sa; for (sig = signals; sig->signo != 0; sig++) { // 遍历信号列表 ngx_memzero(&sa, sizeof(struct sigaction)); // 内存清零 sa sa.sa_handler = sig->handler; // 获取处理的handler sigemptyset(&sa.sa_mask); if (sigaction(sig->signo, &sa, NULL) == -1) { // 设置信号处理#if (NGX_VALGRIND) ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "sigaction(%s) failed, ignored", sig->signame);#else ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "sigaction(%s) failed", sig->signame); return NGX_ERROR;#endif } } return NGX_OK; // 信号处理成功}
主要是注册了相关的信号处理函数ngx_signal_handler,
voidngx_signal_handler(int signo){ char *action; ngx_int_t ignore; ngx_err_t err; ngx_signal_t *sig; ignore = 0; err = ngx_errno; for (sig = signals; sig->signo != 0; sig++) { // 判断信号是否在列表中 if (sig->signo == signo) { // 住过找到 break; } } ngx_time_sigsafe_update(); // 更新时间 action = ""; switch (ngx_process) { case NGX_PROCESS_MASTER: case NGX_PROCESS_SINGLE: switch (signo) { case ngx_signal_value(NGX_SHUTDOWN_SIGNAL): // 判断相关信号并赋值相关标志位 ngx_quit = 1; action = ", shutting down"; break; case ngx_signal_value(NGX_TERMINATE_SIGNAL): case SIGINT: ngx_terminate = 1; action = ", exiting"; break; ...}
信号初始化完成之后,就继续执行master的初始化。
master/worker的初始化
在初始化完成之后,就进入ngx_master_process_cycle的执行,
voidngx_master_process_cycle(ngx_cycle_t *cycle){ char *title; u_char *p; size_t size; ngx_int_t i; ngx_uint_t n, sigio; sigset_t set; struct itimerval itv; ngx_uint_t live; ngx_msec_t delay; ngx_listening_t *ls; ngx_core_conf_t *ccf; sigemptyset(&set); // 设置信号量处理 sigaddset(&set, SIGCHLD); sigaddset(&set, SIGALRM); sigaddset(&set, SIGIO); sigaddset(&set, SIGINT); sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL)); if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { // 设置信号量处理 屏蔽注册的信号 ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); } sigemptyset(&set); // 清除信号数据 size = sizeof(master_process); for (i = 0; i < ngx_argc; i++) { size += ngx_strlen(ngx_argv[i]) + 1; } title = ngx_pnalloc(cycle->pool, size); // 获取内存 if (title == NULL) { /* fatal */ exit(2); } p = ngx_cpymem(title, master_process, sizeof(master_process) - 1); // 拷贝进程名称信息 for (i = 0; i < ngx_argc; i++) { *p++ = ' '; p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size); } ngx_setproctitle(title); // 设置进程名称 ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 获取配置信息 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); // 启动worker进程 ngx_start_cache_manager_processes(cycle, 0); // 启动cache进程 ngx_new_binary = 0; delay = 0; sigio = 0; live = 1; for ( ;; ) { if (delay) { // 是否有过期事件 if (ngx_sigalrm) { // 是否是定时器到期信号 sigio = 0; delay *= 2; ngx_sigalrm = 0; // 置零 } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "termination cycle: %M", delay); itv.it_interval.tv_sec = 0; // 设置定时器 itv.it_interval.tv_usec = 0; itv.it_value.tv_sec = delay / 1000; itv.it_value.tv_usec = (delay % 1000 ) * 1000; if (setitimer(ITIMER_REAL, &itv, NULL) == -1) { // 重新设置定时器 ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setitimer() failed"); } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend"); sigsuspend(&set); // 阻塞等待信号发生 ngx_time_update(); // 更新时间 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "wake up, sigio %i", sigio); if (ngx_reap) { ngx_reap = 0; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children"); live = ngx_reap_children(cycle); // 有子进程结束检查worker状态 } if (!live && (ngx_terminate || ngx_quit)) { ngx_master_process_exit(cycle); // 主进程退出 } if (ngx_terminate) { // 强制关闭 if (delay == 0) { delay = 50; } if (sigio) { sigio--; continue; } sigio = ccf->worker_processes + 2 /* cache processes */; if (delay > 1000) { ngx_signal_worker_processes(cycle, SIGKILL); // 超时就强行关闭 } else { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_TERMINATE_SIGNAL)); // 发送终止信号 } continue; } if (ngx_quit) { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); // 让子进程退出 ls = cycle->listening.elts; for (n = 0; n < cycle->listening.nelts; n++) { // 关闭连接 if (ngx_close_socket(ls[n].fd) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, ngx_close_socket_n " %V failed", &ls[n].addr_text); } } cycle->listening.nelts = 0; // 连接数置空 continue; } if (ngx_reconfigure) { // 重新加载配置 ngx_reconfigure = 0; // 置位0 if (ngx_new_binary) { ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); ngx_noaccepting = 0; continue; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring"); cycle = ngx_init_cycle(cycle); // 重新初始化配置 if (cycle == NULL) { cycle = (ngx_cycle_t *) ngx_cycle; continue; } ngx_cycle = cycle; // 重置该变量 ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 获取配置文件 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_JUST_RESPAWN); // 开始子进程 ngx_start_cache_manager_processes(cycle, 1); // 开启cache /* allow new processes to start */ ngx_msleep(100); live = 1; ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); // 停止已经存在的工作子进程 } if (ngx_restart) { ngx_restart = 0; ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); // 重启 ngx_start_cache_manager_processes(cycle, 0); live = 1; } if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, ccf->user); ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_REOPEN_SIGNAL)); // 重新打开日志文件 } if (ngx_change_binary) { ngx_change_binary = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary"); ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv); } if (ngx_noaccept) { ngx_noaccept = 0; ngx_noaccepting = 1; // 重置标志位 ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); // 给子进程发送信号 } }}
该函数,主要就是先屏蔽不需要关注的信号,然后通过ngx_start_worker_processes函数启动子进程,启动完子进程之后,就进入for的死循环中,该循环的主要作用就是等待相关信号的发生,等待定时器的事件发生,等待对该进程重启、停止等相关信号操作的事件发生,worker的管理工作都集中于for循环的列表中进行的操作。至此master的主要工作就分析完成。
worker的启动运行
worker进程的启动主要是通过ngx_start_worker_processes该函数开始的,
static voidngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type){ ngx_int_t i; ngx_channel_t ch; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes"); ngx_memzero(&ch, sizeof(ngx_channel_t)); // 申请管道空间 ch.command = NGX_CMD_OPEN_CHANNEL; // 打开管道 for (i = 0; i < n; i++) { // 生成多少个worker进程 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); // 生成worker子进程并设置进程名称 ch.pid = ngx_processes[ngx_process_slot].pid; // 获取PID ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; ngx_pass_open_channel(cycle, &ch); // 打开管道 }}
此时继续查看ngx_spawn_process来查看子进程的生成过程;
ngx_pid_tngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn){ u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { for (s = 0; s < ngx_last_process; s++) { // 找到ngx_process中一个可用的位置 if (ngx_processes[s].pid == -1) { break; } } if (s == NGX_MAX_PROCESSES) { // 如果大于最大的则报错 ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } if (respawn != NGX_PROCESS_DETACHED) { // 相关管道的操作 /* Solaris 9 still has no AF_LOCAL */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; // 赋值s pid = fork(); // 生成子进程 switch (pid) { case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; // 如果出错则返回 case 0: ngx_pid = ngx_getpid(); // 子进程获取pid proc(cycle, data); // 执行子进程 break; default: break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); ngx_processes[s].pid = pid; // 父进程获取当前执行的pid ngx_processes[s].exited = 0; // 是否退出设置为0 if (respawn >= 0) { return pid; } ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; // 返回PID}
其中在生成子进程之后,就调用了proc方法来执行子进程的相关操作,此时的proc就是对应的ngx_worker_process_cycle函数;
static voidngx_worker_process_cycle(ngx_cycle_t *cycle, void *data){ ngx_int_t worker = (intptr_t) data; ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker; ngx_worker_process_init(cycle, worker); // 初始化 ngx_setproctitle("worker process"); // 设置进程名称 for ( ;; ) { if (ngx_exiting) { // 是否退出 ngx_event_cancel_timers(); // 取消事件定时器 if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); // 退出 } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); ngx_process_events_and_timers(cycle); // 处理请求与连接 if (ngx_terminate) { // 是否终止 ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } if (ngx_quit) { // 是否退出 ngx_quit = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down"); ngx_setproctitle("worker process is shutting down"); if (!ngx_exiting) { ngx_exiting = 1; ngx_close_listening_sockets(cycle); // 关闭连接 ngx_close_idle_connections(cycle); } } if (ngx_reopen) { // 重新打开日志文件 ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } }}
由该函数可知,此时worker子进程就工作在for循环中,通过调用ngx_process_events_and_timers函数进行请求的处理与事件的处理。worker的工作就此开始运行,后续将进一步分析worker启动过程中的相关初始化与事件处理机制。
总结
本文大致描述了nginx在master/worker工作模式下,master的工作流程的启动与worker工作进程的启动过程,master与worker之间也初始化了管道通信,也注册了信号相关的处理,worker在接受到master相关的信号时会执行相关操作,本文只是简单的描述了启动过程与基本的工作机制,后续还将继续分析。鉴于本人才疏学浅,如有疏漏请批评指正。
转载地址:https://blog.csdn.net/qq_33339479/article/details/89179231 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!