文章目录
  1. 1. 进程间的消息传递
  2. 2. N个worker子进程如何知道彼此信息的?
    1. 2.1. 父进程创建子进程
    2. 2.2. 传递新子进程信息给存活的子进程
    3. 2.3. 存活的子进程接受消息

进程间的消息传递

nginx在多进程模型中主要使用socketpair机制来实现进程间的消息传递。
ngx_spawn_process函数中初始化socketpair

{
    if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
    {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "socketpair() failed while spawning \"%s\"", name);
        return NGX_INVALID_PID;
    }
}

N个worker子进程如何知道彼此信息的?

每个进程都维护着一张ngx_processes 是全局的进程表,用来保存存活的子进程信息。

进程通过ngx_processes获取子进程pid和流管道句柄,实现进程间消息传递

ngx_process_t    ngx_processes[NGX_MAX_PROCESSES];

ngx_process_t的结构体定义

typedef struct {
    ngx_pid_t           pid;
    /* 进程的退出状态(主要在waitpid中进行处理) */
    int                 status;
    /* 进程channel(也就是通过socketpair创建的两个句柄) */
    ngx_socket_t        channel[2];

    /* 进程的执行函数(也就是每次spawn,子进程所要执行的那个函数). */
    ngx_spawn_proc_pt   proc;
    void               *data;
    char               *name;

    /* 进程的几个状态 */
    unsigned            respawn:1;
    unsigned            just_spawn:1;
    unsigned            detached:1;
    unsigned            exiting:1;
    unsigned            exited:1;
} ngx_process_t;

父进程创建子进程

父进程调用ngx_start_work_processes函数中,并循环调用ngx_spawn_process函数生成N个worker的子进程,更新ngx_processes进程表

for (i = 0; i < n; i++) {
    ngx_spawn_process(cycle, ngx_worker_process_cycle, 
            (void *) (intptr_t) i, "worker process", type);
}

父进程每调用一次ngx_spawn_process函数后,将新fork的子进程pid和流管道的句柄channel[0]复制给ch变量(ngx_channel_t),

然后调用ngx_pass_open_channel函数把ch消息传递给之前创建的子进程。

{
    ch.command = NGX_CMD_OPEN_CHANNEL;
    ch.pid = ngx_processes[ngx_process_slot].pid;
    ch.slot = ngx_process_slot;
    ch.fd = ngx_processes[ngx_process_slot].channel[0];

    ngx_pass_open_channel(cycle, &ch);
}

传递新子进程信息给存活的子进程

进程间消息格式定义ngx_channel_t结构体,其定义

typedef struct {
    /* 对端将接受到的命令 */
     ngx_uint_t  command;
     /* 进程id */
     ngx_pid_t   pid;
     /* 在全局ngx_processess数组中的位置 */
     ngx_int_t   slot;
     /* 传递的fd */
     ngx_fd_t    fd;
} ngx_channel_t;

父进程调用ngx_pass_open_channel函数遍历ngx_processes进程表找到存活的子进程们并发送消息(ngx_channel_t)给他们

static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{
    ngx_int_t  i;

    for (i = 0; i < ngx_last_process; i++) {

        if (i == ngx_process_slot
            || ngx_processes[i].pid == -1
            || ngx_processes[i].channel[0] == -1)
        {
            continue;
        }
        /* 去掉部分代码 */
        /* TODO: NGX_AGAIN */
        /* 把ch的信息发给前面的子进程们 */
        ngx_write_channel(ngx_processes[i].channel[0],
                          ch, sizeof(ngx_channel_t), cycle->log);
    }
}

存活的子进程接受消息

存活的子进程调用ngx_channel_handle函数(管道可读事件捕捉函数),读取消息并解析成ngx_channel_t,并根据command做相应的处理.

在ngx_start_work_processes函数中已知ch.command = NGX_CMD_OPEN_CHANNEL;

存活的子进程收到信息更新自己的ngx_processes进程表,子进程就得到新创建的子进程的信息,子进程间就可以通信。

static void
ngx_channel_handler(ngx_event_t *ev)
{
    ngx_int_t          n;
    ngx_channel_t      ch;
    ngx_connection_t  *c;

    /* 去掉部分代码 */
    for ( ;; ) {
        n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
        /* 去掉部分代码 */
        switch (ch.command) {
        /* 去掉部分代码 */
        case NGX_CMD_OPEN_CHANNEL:
         /* 进程更新自己的ngx_processes进程表,记录其他子进程的相关信息 */
            ngx_processes[ch.slot].pid = ch.pid;
            ngx_processes[ch.slot].channel[0] = ch.fd;
            break;
        case NGX_CMD_CLOSE_CHANNEL:

            if (close(ngx_processes[ch.slot].channel[0]) == -1) {
                ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                              "close() channel failed");
            }
            ngx_processes[ch.slot].channel[0] = -1;
            break;
        }
    }
}