Jabberd2 通信框架(mio)模型 (下)

之前的文章介绍了mio的数据结构模型,但是它只是定义了I/O复用的模型,主要是负责I/O端口上的读写事件监听,但真正的读(recv)和写(send)并不是由mio完成,读写的分析逻辑涉及到协议,因此jabberd2中这项任务交给了sx模块完成,源码中没有看到对sx这两个字母做出的解释,光看源码的sx目录很难看出这个模块到底具体的职责,关于它日后再说.对于mio来说, 可以把sx看成是读写的执行体, 读取到xml片段怎么分析, 要写什么内容来回复,都由sx完成.

关于如何调试jabberd2已经讲过, jabberd2将整个服务器交给了4个进程模块来执行,或许这个原因, 作者在模块中没有用多线程,靠一个主线程完成全部工作.所以整个架构里充满了函数回调,而一个回调函数可能在各种不同的情况下被调用,因此代码的修改上必须小心翼翼,这或许又使其扩展性受到了极大地约束,看这类代码非常费劲.毫无例外地, mio和sx的配合过程也是靠回调完成.

其实从callback函数的命名上可以看出它的由来,第一个单词(c2s)指明了所属的进程模块,第二个单词(router)表示这个函数承载的通信对象,第三个单词(mio)表示了调用这个回调函数的触发者,最后一个callback顾名思义了.因此c2s_router_mio_callback表明的是c2s模块里和router交互的mio模型回调函数,在jabberd2架构里这类回调函数通常都是注册在第三个单词(这里即mio)的实体中,由其触发.一个mio_callback可以管理一类对象的通信,针对router的交互注册的是c2s_router_sx_callback,针对客户端发起的请求注册的是_c2s_client_mio_callback,这些函数都在c2s.c中.

前面说到,mio不负责具体的读写操作,因此这里又引入了sx模块的回调,针对每一个(文件描述符FD)连接通信,mio_callback都会调用与其对应的sx_callback.
c2s_router_sx_callback对应于c2s_router_mio_callback,后者(mio)的读取请求操作都发给前者(sx),由于router和c2s是一对一的关系,所以进入这两个回调的连接句柄FD都是同一个,因此相对比较容易调试和理解,下图也以此为例解释.而一个c2s建立的服务端对应N个客户端,所以client_mio_callback会针对不同连接做出不同client_sx_callback调用请求.下图是以c2s和router建立连接的过程来表示.

Image

在进入while循环前首先要建立起与router的连接(_c2s_router_connect),while循环靠mio_run来驱动.在进入循环前,FD(m,fd)->type已被设成type_CONNECT_WRITE,表示首次建立连接并希望写入数据.这里要指出一点, 从_mio_read_mio_write的区别可以看出来由于采用的是Reactor模式, _mio_read实际上是延时读,_mio_write即时写.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/** start processing read events */
static void _mio_read(mio_t m, mio_fd_t fd)
{
    if(m == NULL || fd == NULL) return;

    /* if connecting, do this later */
    if(FD(m,fd)->type & type_CONNECT)
    {
        FD(m,fd)->type |= type_CONNECT_READ;
        return;
    }

    MIO_SET_READ(m, FD(m,fd));
}

/** try writing to the socket via the app */
static void _mio_write(mio_t m, mio_fd_t fd)
{
    if(m == NULL || fd == NULL) return;

    /* if connecting, do this later */
    if(FD(m,fd)->type & type_CONNECT)
    {
        FD(m,fd)->type |= type_CONNECT_WRITE;
        return;
    }

    if(FD(m,fd)->type != type_NORMAL)
        return;

    // 调用c2s_router_mio_callback
    if(ACT(m, fd, action_WRITE, NULL) == 0) return;

    /* not all written, do more l8r */
    MIO_SET_WRITE(m, FD(m,fd));
}

实际的读操作要等待到下一轮FD轮询时才会执行,所以这里以_mio__connect来说明,此时希望写入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** internally change a connecting socket to a normal one */
static void _mio__connect(mio_t m, mio_fd_t fd)
{
mio_type_t type = FD(m,fd)->type;

mio_debug(ZONE, "connect processing for fd #%d", fd->fd);
/* reset type and clear the "write" event that flags connect() is done */
FD(m,fd)->type = type_NORMAL;
MIO_UNSET_WRITE(m,FD(m,fd));

/* if the app had asked to do anything in the meantime, do those now */
if(type & type_CONNECT_READ) mio_read(m,fd);
if(type & type_CONNECT_WRITE) mio_write(m,fd);
}

接下来其实就是函数回调了, 没什么特殊的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int c2s_router_mio_callback(mio_t m, mio_action_t a, mio_fd_t fd, void *data, void *arg) {
c2s_t c2s = (c2s_t) arg;
int nbytes;
...
switch(a) {
...
case action_WRITE:
log_debug(ZONE, "write action on fd %d", fd->fd);
return sx_can_write(c2s->router);
    ...
}

int sx_can_write(sx_t s) {
    /* 整个函数有好几种触发_sx_event的情况,
        event_WRITE 是它的主要目的 */
    sx_buf_t out;
int ret, written;
    ...
    /* get the callback to do the write */
_sx_debug(ZONE, "handing app %d bytes to write", out->len);
written = _sx_event(s, event_WRITE, (void *) out);
    ...
}

int __sx_event(const char *file, int line, sx_t s, sx_event_t e, void *data) {
int ret;
_sx_debug(file, line, "tag %d event %d data 0x%x", s->tag, e, data);
s->reentry++;
    // 调用c2s_router_sx_callback
ret = (s->cb)(s, e, data, s->cb_arg);
s->reentry--;    return ret;
}

int c2s_router_sx_callback(sx_t s, sx_event_t e, void *data, void *arg) {
    c2s_t c2s = (c2s_t) arg;
    sx_buf_t buf = (sx_buf_t) data;

    ...
    switch(e) {
        ...
        case event_WRITE:
            log_debug(ZONE, "writing to %d", c2s->fd->fd);

            len = send(c2s->fd->fd, buf->data, buf->len, 0);
            if(len >= 0) {
                log_debug(ZONE, "%d bytes written", len);
                return len;
            }

     ...
}

跟踪它代码后发现,与router建立连接后才建立起监听服务端,而且还会再监听一个FIFO文件管道,代码在c2s/pbx.cc2s/pbx_commands.c里, 看上去是用来控制jid账户的小trick, 同样也有mio回调.

总结, 回调很多, 很难一时间就看出来作者的回调分支里的种种意图, 继续学习.