资讯专栏INFORMATION COLUMN

Swoole 源码分析——Reactor 模块之 ReactorEpoll

leanxi / 2964人阅读

摘要:对象的创建在中,最为高效的机制就是。该数据结构中是的,用于在函数接受就绪的事件。为了能够更为简便在调用后获取的类型,并不会仅仅向函数添加,而是会添加类型,该数据结构中包含文件描述符和文件类型。

Epoll 对象的创建

linux 中,最为高效的 reactor 机制就是 epollswReactorobject 会存储 epoll 的对象 swReactorEpoll_s。该数据结构中 epfdepollidevents 用于在 epoll_wait 函数接受就绪的事件。

该函数最重要的是 epoll_create,该函数会创建 epoll 对象

typedef struct swReactorEpoll_s swReactorEpoll;

struct swReactorEpoll_s
{
    int epfd;
    struct epoll_event *events;
};

int swReactorEpoll_create(swReactor *reactor, int max_event_num)
{
    //create reactor object
    swReactorEpoll *reactor_object = sw_malloc(sizeof(swReactorEpoll));
    if (reactor_object == NULL)
    {
        swWarn("malloc[0] failed.");
        return SW_ERR;
    }
    bzero(reactor_object, sizeof(swReactorEpoll));
    reactor->object = reactor_object;
    reactor->max_event_num = max_event_num;

    reactor_object->events = sw_calloc(max_event_num, sizeof(struct epoll_event));

    if (reactor_object->events == NULL)
    {
        swWarn("malloc[1] failed.");
        sw_free(reactor_object);
        return SW_ERR;
    }
    //epoll create
    reactor_object->epfd = epoll_create(512);
    if (reactor_object->epfd < 0)
    {
        swWarn("epoll_create failed. Error: %s[%d]", strerror(errno), errno);
        sw_free(reactor_object);
        return SW_ERR;
    }
    //binding method
    reactor->add = swReactorEpoll_add;
    reactor->set = swReactorEpoll_set;
    reactor->del = swReactorEpoll_del;
    reactor->wait = swReactorEpoll_wait;
    reactor->free = swReactorEpoll_free;

    return SW_OK;
}
Epoll 添加监听

swReactorEpoll_event_set 函数用于转化可读(SW_EVENT_READ)、可写(SW_EVENT_WRITE )的状态为 epoll 函数可用的 EPOLLINEPOLLOUTEPOLLERR

static sw_inline int swReactorEpoll_event_set(int fdtype)
{
    uint32_t flag = 0;
    if (swReactor_event_read(fdtype))
    {
        flag |= EPOLLIN;
    }
    if (swReactor_event_write(fdtype))
    {
        flag |= EPOLLOUT;
    }
    if (swReactor_event_error(fdtype))
    {
        //flag |= (EPOLLRDHUP);
        flag |= (EPOLLRDHUP | EPOLLHUP | EPOLLERR);
    }
    return flag;
}

swReactorEpoll_add 函数用于为 reactor 添加新的文件描述符进行监控

添加 fd 最为重要的的是利用 epoll_ctl 函数的 EPOLL_CTL_ADD 命令。为了能够更为简便在调用 epoll_wait 后获取 fd 的类型,并不会仅仅向 epoll_ctl 函数添加 fd,而是会添加 swFd 类型,该数据结构中包含文件描述符和文件类型。

swReactor_add 函数用于更新 reactor->socket_listfdtypeevents

最后需要自增 event_num 的数值

typedef struct _swFd
{
    uint32_t fd;
    uint32_t fdtype;
} swFd;

static int swReactorEpoll_add(swReactor *reactor, int fd, int fdtype)
{
    swReactorEpoll *object = reactor->object;
    struct epoll_event e;
    swFd fd_;
    bzero(&e, sizeof(struct epoll_event));

    fd_.fd = fd;
    fd_.fdtype = swReactor_fdtype(fdtype);
    e.events = swReactorEpoll_event_set(fdtype);

    swReactor_add(reactor, fd, fdtype);

    memcpy(&(e.data.u64), &fd_, sizeof(fd_));
    if (epoll_ctl(object->epfd, EPOLL_CTL_ADD, fd, &e) < 0)
    {
        swSysError("add events[fd=%d#%d, type=%d, events=%d] failed.", fd, reactor->id, fd_.fdtype, e.events);
        swReactor_del(reactor, fd);
        return SW_ERR;
    }

    swTraceLog(SW_TRACE_EVENT, "add event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
    reactor->event_num++;

    return SW_OK;
}

static sw_inline void swReactor_add(swReactor *reactor, int fd, int type)
{
    swConnection *socket = swReactor_get(reactor, fd);
    socket->fdtype = swReactor_fdtype(type);
    socket->events = swReactor_events(type);
    socket->removed = 0;
}
Epoll 修改监听

修改监听主要调用 epoll_ctlEPOLL_CTL_MOD 命令

static int swReactorEpoll_set(swReactor *reactor, int fd, int fdtype)
{
    swReactorEpoll *object = reactor->object;
    swFd fd_;
    struct epoll_event e;
    int ret;

    bzero(&e, sizeof(struct epoll_event));
    e.events = swReactorEpoll_event_set(fdtype);

    if (e.events & EPOLLOUT)
    {
        assert(fd > 2);
    }

    fd_.fd = fd;
    fd_.fdtype = swReactor_fdtype(fdtype);
    memcpy(&(e.data.u64), &fd_, sizeof(fd_));

    ret = epoll_ctl(object->epfd, EPOLL_CTL_MOD, fd, &e);
    if (ret < 0)
    {
        swSysError("reactor#%d->set(fd=%d|type=%d|events=%d) failed.", reactor->id, fd, fd_.fdtype, e.events);
        return SW_ERR;
    }
    swTraceLog(SW_TRACE_EVENT, "set event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
    //execute parent method
    swReactor_set(reactor, fd, fdtype);
    return SW_OK;
}
Epoll 删除监听

修改监听主要调用 epoll_ctlEPOLL_CTL_DEL 命令

最后需要更新 event_num

static int swReactorEpoll_del(swReactor *reactor, int fd)
{
    swReactorEpoll *object = reactor->object;
    if (epoll_ctl(object->epfd, EPOLL_CTL_DEL, fd, NULL) < 0)
    {
        swSysError("epoll remove fd[%d#%d] failed.", fd, reactor->id);
        return SW_ERR;
    }

    swTraceLog(SW_TRACE_REACTOR, "remove event[reactor_id=%d|fd=%d]", reactor->id, fd);
    reactor->event_num = reactor->event_num <= 0 ? 0 : reactor->event_num - 1;
    swReactor_del(reactor, fd);

    return SW_OK;
}
Epoll 监听等待就绪

swReactorEpoll_waitreactor 的核心,该函数最重要的就是调用 epoll_wait

首先需要通过 timeo 参数设置 msec,利用 object->events 设置 events

epoll_wait 函数返回之后,如果 n<0,那么需要先检查 erron,如果是 EINTR,那么说明有信号触发,此时需要进行信号的回调函数,然后再继续事件循环。如果不是 EINTR,那么就要返回错误,结束事件循环

如果 n == 0,一般是由于 epoll_wait 已超时,此时需要调用超时回调函数

如果 n > 0,那么就要从 events 中取出已经就绪的 swFd 对象,并利用该对象的值初始化 event

接下来就要检查 events[i].events 的值,来判断具体是读就绪、写就绪还是发生了错误,值得注意的是 EPOLLRDHUP 事件,此事件代表着对端断开连接,这个是 linux 自从 2.6.17 的新特性

利用 swReactor_getHandle 函数取出对应的文件描述符类型的事件回调函数

事件循环的最后调用 onFinish 函数

如果设置了 once,说明此 reactor 只会循环一次,立即退出;否则,继续事件循环

typedef struct _swEvent
{
    int fd;
    int16_t from_id;
    uint8_t type;
    swConnection *socket;
} swEvent;

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    swEvent event;
    swReactorEpoll *object = reactor->object;
    swReactor_handle handle;
    int i, n, ret, msec;

    int reactor_id = reactor->id;
    int epoll_fd = object->epfd;
    int max_event_num = reactor->max_event_num;
    struct epoll_event *events = object->events;

    if (reactor->timeout_msec == 0)
    {
        if (timeo == NULL)
        {
            reactor->timeout_msec = -1;
        }
        else
        {
            reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
        }
    }

    reactor->start = 1;

    while (reactor->running > 0)
    {
        if (reactor->onBegin != NULL)
        {
            reactor->onBegin(reactor);
        }
        msec = reactor->timeout_msec;
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
            if (swReactor_error(reactor) < 0)
            {
                swWarn("[Reactor#%d] epoll_wait failed. Error: %s[%d]", reactor_id, strerror(errno), errno);
                return SW_ERR;
            }
            else
            {
                continue;
            }
        }
        else if (n == 0)
        {
            if (reactor->onTimeout != NULL)
            {
                reactor->onTimeout(reactor);
            }
            continue;
        }
        for (i = 0; i < n; i++)
        {
            event.fd = events[i].data.u64;
            event.from_id = reactor_id;
            event.type = events[i].data.u64 >> 32;
            event.socket = swReactor_get(reactor, event.fd);

            //read
            if ((events[i].events & EPOLLIN) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLIN handle failed. fd=%d.", event.fd);
                }
            }
            //write
            if ((events[i].events & EPOLLOUT) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLOUT handle failed. fd=%d.", event.fd);
                }
            }
            //error
#ifndef NO_EPOLLRDHUP
            if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#else
            if ((events[i].events & (EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#endif
            {
                //ignore ERR and HUP, because event is already processed at IN and OUT handler.
                if ((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT))
                {
                    continue;
                }
                handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLERR handle failed. fd=%d.", event.fd);
                }
            }
        }

        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
        if (reactor->once)
        {
            break;
        }
    }
    return 0;
}

static sw_inline int swReactor_error(swReactor *reactor)
{
    switch (errno)
    {
    case EINTR:
        if (reactor->singal_no)
        {
            swSignal_callback(reactor->singal_no);
            reactor->singal_no = 0;
        }
        return SW_OK;
    }
    return SW_ERR;
}

static sw_inline swReactor_handle swReactor_getHandle(swReactor *reactor, int event_type, int fdtype)
{
    if (event_type == SW_EVENT_WRITE)
    {
        return (reactor->write_handle[fdtype] != NULL) ? reactor->write_handle[fdtype] : reactor->handle[SW_FD_WRITE];
    }
    else if (event_type == SW_EVENT_ERROR)
    {
        return (reactor->error_handle[fdtype] != NULL) ? reactor->error_handle[fdtype] : reactor->handle[SW_FD_CLOSE];
    }
    return reactor->handle[fdtype];
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29217.html

相关文章

  • Swoole 源码分析——Client模块Send

    摘要:当此时的套接字不可写的时候,会自动放入缓冲区中。当大于高水线时,会自动调用回调函数。写就绪状态当监控到套接字进入了写就绪状态时,就会调用函数。如果为,说明此时异步客户端虽然建立了连接,但是还没有调用回调函数,因此这时要调用函数。 前言 上一章我们说了客户端的连接 connect,对于同步客户端来说,连接已经建立成功;但是对于异步客户端来说,此时可能还在进行 DNS 的解析,on...

    caozhijian 评论0 收藏0
  • Swoole 源码分析——Server模块Stream 模式

    摘要:新建可以看到,自动采用包长检测的方法该函数主要功能是设置各种回调函数值得注意的是第三个参数代表是否异步。发送数据函数并不是直接发送数据,而是将数据存储在,等着写事件就绪之后调用发送数据。 swReactorThread_dispatch 发送数据 reactor 线程会通过 swReactorThread_dispatch 发送数据,当采用 stream 发送数据的时候,会调用 sw...

    wums 评论0 收藏0
  • Swoole 源码分析——Reactor模块ReactorBase

    前言 作为一个网络框架,最为核心的就是消息的接受与发送。高效的 reactor 模式一直是众多网络框架的首要选择,本节主要讲解 swoole 中的 reactor 模块。 UNP 学习笔记——IO 复用 Reactor 的数据结构 Reactor 的数据结构比较复杂,首先 object 是具体 Reactor 对象的首地址,ptr 是拥有 Reactor 对象的类的指针, event_nu...

    baukh789 评论0 收藏0
  • Swoole 源码分析——基础模块 Pipe 管道

    摘要:并没有使用命名管道。的创建创建匿名管道就是调用函数,程序自动设置管道为非阻塞式。函数同样的获取管道文件描述符根据来决定。模块负责为进程创建与。当线程启动的时候,会将加入的监控当中。 前言 管道是进程间通信 IPC 的最基础的方式,管道有两种类型:命名管道和匿名管道,匿名管道专门用于具有血缘关系的进程之间,完成数据传递,命名管道可以用于任何两个进程之间。swoole 中的管道都是匿名管道...

    Tikitoo 评论0 收藏0
  • Swoole 源码分析——Server模块Start

    摘要:是缓存区高水位线,达到了说明缓冲区即将满了创建线程函数用于将监控的存放于中向中添加监听的文件描述符等待所有的线程开启事件循环利用创建线程,线程启动函数是保存监听本函数将用于监听的存放到当中,并设置相应的属性 Server 的启动 在 server 启动之前,swoole 首先要调用 php_swoole_register_callback 将 PHP 的回调函数注册到 server...

    3fuyu 评论0 收藏0

发表评论

0条评论

leanxi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<