博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Swoole 源码分析——Client模块之Recv
阅读量:6603 次
发布时间:2019-06-24

本文共 10599 字,大约阅读时间需要 35 分钟。

recv 接受数据

客户端接受数据需要指定缓存区最大长度,就是下面的 buf_lenflags 用于指定是否设置 waitall 标志,如果设定了 waitall 就必须设定准确的 size,否则会一直等待,直到接收的数据长度达到 size

客户端启用了 EOF/Length 检测后,无需设置 sizewaitall 参数。扩展层会返回完整的数据包或者返回false

开启了 open_eof_check/open_length_check 选项之后,会自动进行包长检测,过程和服务端类似,此处不需要多说。

static PHP_METHOD(swoole_client, recv){    zend_long buf_len = SW_PHP_CLIENT_BUFFER_SIZE;    zend_long flags = 0;    int ret;    char *buf = NULL;    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &buf_len, &flags) == FAILURE)    {        return;    }    //waitall    if (flags == 1)    {        flags = MSG_WAITALL;    }    swClient *cli = client_get_ptr(getThis() TSRMLS_CC);    swProtocol *protocol = &cli->protocol;    if (cli->open_eof_check)    {        if (cli->buffer == NULL)        {            cli->buffer = swString_new(SW_BUFFER_SIZE_BIG);        }        swString *buffer = cli->buffer;        int eof = -1;        if (buffer->length > 0)        {            goto find_eof;        }        while (1)        {            buf = buffer->str + buffer->length;            buf_len = buffer->size - buffer->length;            if (buf_len > SW_BUFFER_SIZE_BIG)            {                buf_len = SW_BUFFER_SIZE_BIG;            }            ret = cli->recv(cli, buf, buf_len, 0);            if (ret < 0)            {                swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(errno), errno);                buffer->length = 0;                RETURN_FALSE;            }            else if (ret == 0)            {                buffer->length = 0;                RETURN_EMPTY_STRING();            }            buffer->length += ret;            if (buffer->length < protocol->package_eof_len)            {                continue;            }            find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol->package_eof, protocol->package_eof_len);            if (eof >= 0)            {                eof += protocol->package_eof_len;                SW_RETVAL_STRINGL(buffer->str, eof, 1);                if (buffer->length > eof)                {                    buffer->length -= eof;                    memmove(buffer->str, buffer->str + eof, buffer->length);                }                else                {                    buffer->length = 0;                }                return;            }            else            {                if (buffer->length == protocol->package_max_length)                {                    swoole_php_error(E_WARNING, "no package eof");                    buffer->length = 0;                    RETURN_FALSE;                }                else if (buffer->length == buffer->size)                {                    if (buffer->size < protocol->package_max_length)                    {                        int new_size = buffer->size * 2;                        if (new_size > protocol->package_max_length)                        {                            new_size = protocol->package_max_length;                        }                        if (swString_extend(buffer, new_size) < 0)                        {                            buffer->length = 0;                            RETURN_FALSE;                        }                    }                }            }        }        buffer->length = 0;        RETURN_FALSE;    }    else if (cli->open_length_check)    {        if (cli->buffer == NULL)        {            cli->buffer = swString_new(SW_BUFFER_SIZE_STD);        }        uint32_t header_len = protocol->package_length_offset + protocol->package_length_size;        ret = cli->recv(cli, cli->buffer->str, header_len, MSG_WAITALL);        if (ret <= 0)        {            goto check_return;        }        else if (ret != header_len)        {            ret = 0;            goto check_return;        }        buf_len = protocol->get_package_length(protocol, cli->socket, cli->buffer->str, ret);        //error package        if (buf_len < 0)        {            RETURN_EMPTY_STRING();        }        //empty package        else if (buf_len == header_len)        {            SW_RETURN_STRINGL(cli->buffer->str, header_len, 1);        }        else if (buf_len > protocol->package_max_length)        {            swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "Package is too big. package_length=%d", (int )buf_len);            RETURN_EMPTY_STRING();        }        buf = (char *) emalloc(buf_len + 1);        memcpy(buf, cli->buffer->str, header_len);        SwooleG.error = 0;        ret = cli->recv(cli, buf + header_len, buf_len - header_len, MSG_WAITALL);        if (ret > 0)        {            ret += header_len;            if (ret != buf_len)            {                ret = 0;            }        }    }    else    {        if (!(flags & MSG_WAITALL) && buf_len > SW_PHP_CLIENT_BUFFER_SIZE)        {            buf_len = SW_PHP_CLIENT_BUFFER_SIZE;        }        buf = (char *) emalloc(buf_len + 1);        SwooleG.error = 0;        ret = cli->recv(cli, buf, buf_len, flags);    }    check_return:    if (ret < 0)    {        SwooleG.error = errno;        swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(SwooleG.error), SwooleG.error);        zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC);        swoole_efree(buf);        RETURN_FALSE;    }    else    {        if (ret == 0)        {            swoole_efree(buf);            RETURN_EMPTY_STRING();        }        else        {            buf[ret] = 0;            SW_RETVAL_STRINGL(buf, ret, 0);        }    }}

swClient_tcp_recv_no_buffer 同步 TCP 客户端接受函数

上一小节中的 cli->recv 实际调用的是 swClient_tcp_recv_no_buffer 函数,无论是同步客户端还是异步客户端都是这个函数,该函数会调用 swConnection_recv 接受数据,直到达到超时时间。

值得注意的是这个函数中的 flag 如果是 MSG_WAITALL 标志,recv 会等待所有的数据(长度为 len 的数据)全部到达之后才会返回。

static int swClient_tcp_recv_no_buffer(swClient *cli, char *data, int len, int flag){    int ret;    while (1)    {        ret = swConnection_recv(cli->socket, data, len, flag);        if (ret >= 0)        {            break;        }        if (errno == EINTR)        {            if (cli->interrupt_time <= 0)            {                cli->interrupt_time = swoole_microtime();            }            else if (swoole_microtime() > cli->interrupt_time + cli->timeout)            {                break;            }            else            {                continue;            }        }#ifdef SW_USE_OPENSSL        if (errno == EAGAIN && cli->socket->ssl)        {            int timeout_ms = (int) (cli->timeout * 1000);            if (cli->socket->ssl_want_read && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_READ) == SW_OK)            {                continue;            }            else if (cli->socket->ssl_want_write && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_WRITE) == SW_OK)            {                continue;            }        }#endif        break;    }    return ret;}

swClient_udp_recv 同步 UDP 客户端

对于 UDP 来说,cli->recv 就是函数 swClient_udp_recv, 本函数会尝试调用两次 recvfrom:

static int swClient_udp_recv(swClient *cli, char *data, int length, int flags){    cli->remote_addr.len = sizeof(cli->remote_addr.addr);    int ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr.addr, &cli->remote_addr.len);    if (ret < 0)    {        if (errno == EINTR)        {            ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr, &cli->remote_addr.len);        }        else        {            return SW_ERR;        }    }    return ret;}

swClient_onStreamRead 异步 TCP 客户端读就绪

对于异步 TCP 数据的接受,首先与异步客户端的写就绪类似,首先要判断当前的 SSL 的状态是否是 SW_SSL_STATE_WAIT_STREAM,再次进行 SSL 握手(具体原因不太清楚)。

判断客户端是否配置了 EOF 检测或者长度检测,如果配置了就调用 swProtocol_recv_check_eof/swProtocol_recv_check_length 接受完整的数据包,这两天会调用 swClient_onPackage,进而调用 onReceive 函数。

如果没有配置,那么就简单的调用 swConnection_recv 接受数据,接受到数据之后会调用 onReceive

static int swClient_onPackage(swConnection *conn, char *data, uint32_t length){    swClient *cli = (swClient *) conn->object;    cli->onReceive(conn->object, data, length);    return conn->close_wait ? SW_ERR : SW_OK;}static int swClient_onStreamRead(swReactor *reactor, swEvent *event){    int n;    swClient *cli = event->socket->object;    char *buf = cli->buffer->str + cli->buffer->length;    long buf_size = cli->buffer->size - cli->buffer->length;#ifdef SW_USE_OPENSSL    if (cli->open_ssl && cli->socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)    {        if (swClient_ssl_handshake(cli) < 0)        {            goto connect_fail;        }        if (cli->socket->ssl_state != SW_SSL_STATE_READY)        {            return SW_OK;        }        //ssl handshake sucess        else if (cli->onConnect)        {            execute_onConnect(cli);        }    }#endif    if (cli->open_eof_check || cli->open_length_check)    {        swConnection *conn = cli->socket;        swProtocol *protocol = &cli->protocol;        if (cli->open_eof_check)        {            n = swProtocol_recv_check_eof(protocol, conn, cli->buffer);        }        else        {            n = swProtocol_recv_check_length(protocol, conn, cli->buffer);        }        if (n < 0)        {            return  cli->close(cli);        }        else        {            if (conn->removed == 0 && cli->remove_delay)            {                swClient_sleep(cli);                cli->remove_delay = 0;            }            return SW_OK;        }    }#ifdef SW_CLIENT_RECV_AGAIN    recv_again:#endif    n = swConnection_recv(event->socket, buf, buf_size, 0);    if (n < 0)    {        __error:        switch (swConnection_error(errno))        {        case SW_ERROR:            swSysError("Read from socket[%d] failed.", event->fd);            return SW_OK;        case SW_CLOSE:            goto __close;        case SW_WAIT:            return SW_OK;        default:            return SW_OK;        }    }    else if (n == 0)    {        __close:        return  cli->close(cli);    }    else    {        cli->onReceive(cli, buf, n);#ifdef SW_CLIENT_RECV_AGAIN        if (n == buf_size)        {            goto recv_again;        }#endif        return SW_OK;    }    return SW_OK;}

swClient_onDgramRead 异步 UDP 客户端接受数据

异步的 UDP 客户端接受数据调用的和同步的客户端相同,都是调用 swClient_udp_recv 函数。

static int swClient_onDgramRead(swReactor *reactor, swEvent *event){    swClient *cli = event->socket->object;    char buffer[SW_BUFFER_SIZE_UDP];    int n = swClient_udp_recv(cli, buffer, sizeof(buffer), 0);    if (n < 0)    {        return SW_ERR;    }    else    {        cli->onReceive(cli, buffer, n);    }    return SW_OK;}

转载地址:http://ycwso.baihongyu.com/

你可能感兴趣的文章
操作系统(四)---MS-DOS微软磁盘操作系统
查看>>
ajax提交表单
查看>>
FTP服务
查看>>
我的友情链接
查看>>
xcode8运行后后台打印网络相关的日志
查看>>
python语言中函数的传参与基本练习
查看>>
Java集合框架面试题
查看>>
Django1.7中注册、登陆、以及cookie的使用
查看>>
实现Lync Server 2010企业版前端服务器部署
查看>>
Java的主要就业方向
查看>>
关于使用mac进行文件远程操作
查看>>
Ubuntu Server 14.04 配置VNC
查看>>
我的友情链接
查看>>
Tcl命令操作实验-----(6)---case foreach
查看>>
Tcl命令操作实验-----(12) ---字典
查看>>
akka的Hello
查看>>
scala的视图界定
查看>>
PHP扩展之文件系统
查看>>
智能手机将替代PC成大众最常用终端 智能手机前景无限
查看>>
解决linux下用ntpdate同步时间
查看>>