Nginx--进程间的消息传递
进程间的消息传递
nginx在多进程模型中主要使用socketpair机制来实现进程间的消息传递。
ngx_spawn_process函数中初始化socketpair
{
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}
}
N个worker子进程如何知道彼此信息的?
每个进程都维护着一张ngx_processes 是全局的进程表,用来保存存活的子进程信息。
进程通过ngx_processes获取子进程pid和流管道句柄,实现进程间消息传递
ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
ngx_process_t的结构体定义
typedef struct {
ngx_pid_t pid;
/* 进程的退出状态(主要在waitpid中进行处理) */
int status;
/* 进程channel(也就是通过socketpair创建的两个句柄) */
ngx_socket_t channel[2];
/* 进程的执行函数(也就是每次spawn,子进程所要执行的那个函数). */
ngx_spawn_proc_pt proc;
void *data;
char *name;
/* 进程的几个状态 */
unsigned respawn:1;
unsigned just_spawn:1;
unsigned detached:1;
unsigned exiting:1;
unsigned exited:1;
} ngx_process_t;
父进程创建子进程
父进程调用ngx_start_work_processes函数中,并循环调用ngx_spawn_process函数生成N个worker的子进程,更新ngx_processes进程表
for (i = 0; i < n; i++) {
ngx_spawn_process(cycle, ngx_worker_process_cycle,
(void *) (intptr_t) i, "worker process", type);
}
父进程每调用一次ngx_spawn_process函数后,将新fork的子进程pid和流管道的句柄channel[0]复制给ch变量(ngx_channel_t),
然后调用ngx_pass_open_channel函数把ch消息传递给之前创建的子进程。
{
ch.command = NGX_CMD_OPEN_CHANNEL;
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
}
传递新子进程信息给存活的子进程
进程间消息格式定义ngx_channel_t结构体,其定义
typedef struct {
/* 对端将接受到的命令 */
ngx_uint_t command;
/* 进程id */
ngx_pid_t pid;
/* 在全局ngx_processess数组中的位置 */
ngx_int_t slot;
/* 传递的fd */
ngx_fd_t fd;
} ngx_channel_t;
父进程调用ngx_pass_open_channel函数遍历ngx_processes进程表找到存活的子进程们并发送消息(ngx_channel_t)给他们
static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{
ngx_int_t i;
for (i = 0; i < ngx_last_process; i++) {
if (i == ngx_process_slot
|| ngx_processes[i].pid == -1
|| ngx_processes[i].channel[0] == -1)
{
continue;
}
/* 去掉部分代码 */
/* TODO: NGX_AGAIN */
/* 把ch的信息发给前面的子进程们 */
ngx_write_channel(ngx_processes[i].channel[0],
ch, sizeof(ngx_channel_t), cycle->log);
}
}
存活的子进程接受消息
存活的子进程调用ngx_channel_handle函数(管道可读事件捕捉函数),读取消息并解析成ngx_channel_t,并根据command做相应的处理.
在ngx_start_work_processes函数中已知ch.command = NGX_CMD_OPEN_CHANNEL;
存活的子进程收到信息更新自己的ngx_processes进程表,子进程就得到新创建的子进程的信息,子进程间就可以通信。
static void
ngx_channel_handler(ngx_event_t *ev)
{
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
/* 去掉部分代码 */
for ( ;; ) {
n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
/* 去掉部分代码 */
switch (ch.command) {
/* 去掉部分代码 */
case NGX_CMD_OPEN_CHANNEL:
/* 进程更新自己的ngx_processes进程表,记录其他子进程的相关信息 */
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
case NGX_CMD_CLOSE_CHANNEL:
if (close(ngx_processes[ch.slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"close() channel failed");
}
ngx_processes[ch.slot].channel[0] = -1;
break;
}
}
}