资讯专栏INFORMATION COLUMN

Swoole 源码分析——Client模块之Recv

ChanceWong / 885人阅读

摘要:判断客户端是否配置了检测或者长度检测,如果配置了就调用接受完整的数据包,这两天会调用,进而调用函数。异步客户端接受数据异步的客户端接受数据调用的和同步的客户端相同,都是调用函数。

recv 接受数据

客户端接受数据需要指定缓存区最大长度,就是下面的 buf_lenflags 用于指定是否设置 waitall 标志,如果设定了 waitall 就必须设定准确的 size,否则会一直等待,直到接收的数据长度达到 size

客户端启用了 EOF/Length 检测后,无需设置 sizewaitall 参数。扩展层会返回完整的数据包或者返回false

开启了 open_eof_check/open_length_check 选项之后,会自动进行包长检测,过程和服务端类似,此处不需要多说。

static PHP_METHOD(swoole_client, recv)
{
    zend_long buf_len = SW_PHP_CLIENT_BUFFER_SIZE;
    zend_long flags = 0;
    int ret;
    char *buf = NULL;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &buf_len, &flags) == FAILURE)
    {
        return;
    }

    //waitall
    if (flags == 1)
    {
        flags = MSG_WAITALL;
    }

    swClient *cli = client_get_ptr(getThis() TSRMLS_CC);
    swProtocol *protocol = &cli->protocol;

    if (cli->open_eof_check)
    {
        if (cli->buffer == NULL)
        {
            cli->buffer = swString_new(SW_BUFFER_SIZE_BIG);
        }

        swString *buffer = cli->buffer;
        int eof = -1;

        if (buffer->length > 0)
        {
            goto find_eof;
        }

        while (1)
        {
            buf = buffer->str + buffer->length;
            buf_len = buffer->size - buffer->length;

            if (buf_len > SW_BUFFER_SIZE_BIG)
            {
                buf_len = SW_BUFFER_SIZE_BIG;
            }

            ret = cli->recv(cli, buf, buf_len, 0);
            if (ret < 0)
            {
                swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(errno), errno);
                buffer->length = 0;
                RETURN_FALSE;
            }
            else if (ret == 0)
            {
                buffer->length = 0;
                RETURN_EMPTY_STRING();
            }

            buffer->length += ret;

            if (buffer->length < protocol->package_eof_len)
            {
                continue;
            }

            find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol->package_eof, protocol->package_eof_len);
            if (eof >= 0)
            {
                eof += protocol->package_eof_len;
                SW_RETVAL_STRINGL(buffer->str, eof, 1);

                if (buffer->length > eof)
                {
                    buffer->length -= eof;
                    memmove(buffer->str, buffer->str + eof, buffer->length);
                }
                else
                {
                    buffer->length = 0;
                }
                return;
            }
            else
            {
                if (buffer->length == protocol->package_max_length)
                {
                    swoole_php_error(E_WARNING, "no package eof");
                    buffer->length = 0;
                    RETURN_FALSE;
                }
                else if (buffer->length == buffer->size)
                {
                    if (buffer->size < protocol->package_max_length)
                    {
                        int new_size = buffer->size * 2;
                        if (new_size > protocol->package_max_length)
                        {
                            new_size = protocol->package_max_length;
                        }
                        if (swString_extend(buffer, new_size) < 0)
                        {
                            buffer->length = 0;
                            RETURN_FALSE;
                        }
                    }
                }
            }
        }
        buffer->length = 0;
        RETURN_FALSE;
    }
    else if (cli->open_length_check)
    {
        if (cli->buffer == NULL)
        {
            cli->buffer = swString_new(SW_BUFFER_SIZE_STD);
        }

        uint32_t header_len = protocol->package_length_offset + protocol->package_length_size;
        ret = cli->recv(cli, cli->buffer->str, header_len, MSG_WAITALL);
        if (ret <= 0)
        {
            goto check_return;
        }
        else if (ret != header_len)
        {
            ret = 0;
            goto check_return;
        }

        buf_len = protocol->get_package_length(protocol, cli->socket, cli->buffer->str, ret);

        //error package
        if (buf_len < 0)
        {
            RETURN_EMPTY_STRING();
        }
        //empty package
        else if (buf_len == header_len)
        {
            SW_RETURN_STRINGL(cli->buffer->str, header_len, 1);
        }
        else if (buf_len > protocol->package_max_length)
        {
            swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "Package is too big. package_length=%d", (int )buf_len);
            RETURN_EMPTY_STRING();
        }

        buf = (char *) emalloc(buf_len + 1);
        memcpy(buf, cli->buffer->str, header_len);
        SwooleG.error = 0;
        ret = cli->recv(cli, buf + header_len, buf_len - header_len, MSG_WAITALL);
        if (ret > 0)
        {
            ret += header_len;
            if (ret != buf_len)
            {
                ret = 0;
            }
        }
    }
    else
    {
        if (!(flags & MSG_WAITALL) && buf_len > SW_PHP_CLIENT_BUFFER_SIZE)
        {
            buf_len = SW_PHP_CLIENT_BUFFER_SIZE;
        }
        buf = (char *) emalloc(buf_len + 1);
        SwooleG.error = 0;
        ret = cli->recv(cli, buf, buf_len, flags);
    }

    check_return:

    if (ret < 0)
    {
        SwooleG.error = errno;
        swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(SwooleG.error), SwooleG.error);
        zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC);
        swoole_efree(buf);
        RETURN_FALSE;
    }
    else
    {
        if (ret == 0)
        {
            swoole_efree(buf);
            RETURN_EMPTY_STRING();
        }
        else
        {
            buf[ret] = 0;
            SW_RETVAL_STRINGL(buf, ret, 0);
        }
    }
}
swClient_tcp_recv_no_buffer 同步 TCP 客户端接受函数

上一小节中的 cli->recv 实际调用的是 swClient_tcp_recv_no_buffer 函数,无论是同步客户端还是异步客户端都是这个函数,该函数会调用 swConnection_recv 接受数据,直到达到超时时间。

值得注意的是这个函数中的 flag 如果是 MSG_WAITALL 标志,recv 会等待所有的数据(长度为 len 的数据)全部到达之后才会返回。

static int swClient_tcp_recv_no_buffer(swClient *cli, char *data, int len, int flag)
{
    int ret;

    while (1)
    {
        ret = swConnection_recv(cli->socket, data, len, flag);
        if (ret >= 0)
        {
            break;
        }
        if (errno == EINTR)
        {
            if (cli->interrupt_time <= 0)
            {
                cli->interrupt_time = swoole_microtime();
            }
            else if (swoole_microtime() > cli->interrupt_time + cli->timeout)
            {
                break;
            }
            else
            {
                continue;
            }
        }
#ifdef SW_USE_OPENSSL
        if (errno == EAGAIN && cli->socket->ssl)
        {
            int timeout_ms = (int) (cli->timeout * 1000);
            if (cli->socket->ssl_want_read && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_READ) == SW_OK)
            {
                continue;
            }
            else if (cli->socket->ssl_want_write && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_WRITE) == SW_OK)
            {
                continue;
            }
        }
#endif
        break;
    }

    return ret;
}
swClient_udp_recv 同步 UDP 客户端

对于 UDP 来说,cli->recv 就是函数 swClient_udp_recv, 本函数会尝试调用两次 recvfrom:

static int swClient_udp_recv(swClient *cli, char *data, int length, int flags)
{
    cli->remote_addr.len = sizeof(cli->remote_addr.addr);
    int ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr.addr, &cli->remote_addr.len);
    if (ret < 0)
    {
        if (errno == EINTR)
        {
            ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr, &cli->remote_addr.len);
        }
        else
        {
            return SW_ERR;
        }
    }
    return ret;
}

swClient_onStreamRead 异步 TCP 客户端读就绪

对于异步 TCP 数据的接受,首先与异步客户端的写就绪类似,首先要判断当前的 SSL 的状态是否是 SW_SSL_STATE_WAIT_STREAM,再次进行 SSL 握手(具体原因不太清楚)。

判断客户端是否配置了 EOF 检测或者长度检测,如果配置了就调用 swProtocol_recv_check_eof/swProtocol_recv_check_length 接受完整的数据包,这两天会调用 swClient_onPackage,进而调用 onReceive 函数。

如果没有配置,那么就简单的调用 swConnection_recv 接受数据,接受到数据之后会调用 onReceive

static int swClient_onPackage(swConnection *conn, char *data, uint32_t length)
{
    swClient *cli = (swClient *) conn->object;
    cli->onReceive(conn->object, data, length);
    return conn->close_wait ? SW_ERR : SW_OK;
}

static int swClient_onStreamRead(swReactor *reactor, swEvent *event)
{
    int n;
    swClient *cli = event->socket->object;
    char *buf = cli->buffer->str + cli->buffer->length;
    long buf_size = cli->buffer->size - cli->buffer->length;

#ifdef SW_USE_OPENSSL
    if (cli->open_ssl && cli->socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)
    {
        if (swClient_ssl_handshake(cli) < 0)
        {
            goto connect_fail;
        }
        if (cli->socket->ssl_state != SW_SSL_STATE_READY)
        {
            return SW_OK;
        }
        //ssl handshake sucess
        else if (cli->onConnect)
        {
            execute_onConnect(cli);
        }
    }
#endif

    if (cli->open_eof_check || cli->open_length_check)
    {
        swConnection *conn = cli->socket;
        swProtocol *protocol = &cli->protocol;

        if (cli->open_eof_check)
        {
            n = swProtocol_recv_check_eof(protocol, conn, cli->buffer);
        }
        else
        {
            n = swProtocol_recv_check_length(protocol, conn, cli->buffer);
        }

        if (n < 0)
        {
            return  cli->close(cli);
        }
        else
        {
            if (conn->removed == 0 && cli->remove_delay)
            {
                swClient_sleep(cli);
                cli->remove_delay = 0;
            }
            return SW_OK;
        }
    }

#ifdef SW_CLIENT_RECV_AGAIN
    recv_again:
#endif
    n = swConnection_recv(event->socket, buf, buf_size, 0);
    if (n < 0)
    {
        __error:
        switch (swConnection_error(errno))
        {
        case SW_ERROR:
            swSysError("Read from socket[%d] failed.", event->fd);
            return SW_OK;
        case SW_CLOSE:
            goto __close;
        case SW_WAIT:
            return SW_OK;
        default:
            return SW_OK;
        }
    }
    else if (n == 0)
    {
        __close:
        return  cli->close(cli);
    }
    else
    {
        cli->onReceive(cli, buf, n);
#ifdef SW_CLIENT_RECV_AGAIN
        if (n == buf_size)
        {
            goto recv_again;
        }
#endif
        return SW_OK;
    }
    return SW_OK;
}
swClient_onDgramRead 异步 UDP 客户端接受数据

异步的 UDP 客户端接受数据调用的和同步的客户端相同,都是调用 swClient_udp_recv 函数。

static int swClient_onDgramRead(swReactor *reactor, swEvent *event)
{
    swClient *cli = event->socket->object;
    char buffer[SW_BUFFER_SIZE_UDP];

    int n = swClient_udp_recv(cli, buffer, sizeof(buffer), 0);
    if (n < 0)
    {
        return SW_ERR;
    }
    else
    {
        cli->onReceive(cli, buffer, n);
    }
    return SW_OK;
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29497.html

相关文章

  • Swoole 源码分析——Server模块Stream 模式

    摘要:新建可以看到,自动采用包长检测的方法该函数主要功能是设置各种回调函数值得注意的是第三个参数代表是否异步。发送数据函数并不是直接发送数据,而是将数据存储在,等着写事件就绪之后调用发送数据。 swReactorThread_dispatch 发送数据 reactor 线程会通过 swReactorThread_dispatch 发送数据,当采用 stream 发送数据的时候,会调用 sw...

    wums 评论0 收藏0
  • Swoole4.x探究多进程TCP协程服务实现

    摘要:有研究过框架的同学就会发现,其实最核心的,就是用了拓展加上拓展来实现其底层的网络服务和多进程调度。我们在模式下,测试起五个进程主进程要等待回收我们,这样就很简单的实现了一个多进程的协程服务。 有研究过Workman框架的同学就会发现,其实workman最核心的,就是用了php socket拓展加上pcntl拓展来实现其底层的网络服务和多进程调度。那我们今天就来探讨如何使用Swoole的...

    ad6623 评论0 收藏0
  • Swoole 源码分析——Server模块ReactorThread事件循环(下)

    摘要:之后如果仍然有剩余未发送的数据,那么就如果已经没有剩余数据了,继续去取下一个数据包。拿到后,要用函数转化为相应的类型即可得到包长值。 swPort_onRead_check_eof EOF 自动分包 我们前面说过,swPort_onRead_raw 是最简单的向 worker 进程发送数据包的方法,swoole 会将从客户端接受到的数据包,立刻发送给 worker 进程,用户自己把...

    Maxiye 评论0 收藏0
  • Swoole 源码分析——Client模块Connect

    摘要:两个函数是可选回调函数。附带了一组可信任证书。应该注意的是,验证失败并不意味着连接不能使用。在对证书进行验证时,有一些安全性检查并没有执行,包括证书的失效检查和对证书中通用名的有效性验证。 前言 swoole_client 提供了 tcp/udp socket 的客户端的封装代码,使用时仅需 new swoole_client 即可。 swoole 的 socket client 对比...

    Charles 评论0 收藏0
  • Swoole 源码分析——Server模块OpenSSL(下)

    摘要:对于服务端来说,缓存默认是不能使用的,可以通过调用函数来进行设置生效。在回调函数中,首先申请一个大数数据结构,然后将其设定为,该值表示公钥指数,然后利用函数生成秘钥。此时需要调用函数将新的连接与绑定。 前言 上一篇文章我们讲了 OpenSSL 的原理,接下来,我们来说说如何利用 openssl 第三方库进行开发,来为 tcp 层进行 SSL 隧道加密 OpenSSL 初始化 在 sw...

    LiuRhoRamen 评论0 收藏0

发表评论

0条评论

ChanceWong

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<