资讯专栏INFORMATION COLUMN

swoole_event_add实现异步

Cympros / 667人阅读

摘要:提供了函数,可以实现异步。如何变成异步呢实现异步客户端由于读取响应数据是同步堵塞的,我们将加入到事件监听后,底层会自动将该设置为非阻塞模式。而是基于实现的异步客户端,没有并发限制,可在一个进程内同时并发上万请求。

swoole提供了swoole_event_add函数,可以实现异步。此函数可以用在Server或Client模式下。

swoole_event_add属于AsyncIO,必须运行在CLI 模式。
异步tcp客户端 stream_socket_client实现tcp同步客户端

示例:


上述代码是同步执行的。如何变成异步呢?

stream_socket_client实现tcp异步客户端

由于fread读取响应数据是同步堵塞的,我们将$fp加入到事件监听后,底层会自动将该socket设置为非阻塞模式。修改fread那一行:

swoole_event_add($fp, function($fp) {
    echo $resp = fread($fp, 8192);
    swoole_event_del($fp);//socket处理完成后,从epoll事件中移除socket
    fclose($fp);
});
echo "Finish
";  //swoole_event_add不会阻塞进程,这行代码会顺序执行

执行后输出:

Finish
use time:0.087 s
HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 21 Apr 2018 08:36:40 GMT
Content-Type: application/json
Content-Length: 26
Connection: keep-alive
x-oss-request-id: 5ADAF81884D23C965A5D2614
Accept-Ranges: bytes
ETag: "3B3B50D9C802324BB72A74FCD9060004"
Last-Modified: Sat, 21 Apr 2018 04:43:33 GMT
x-oss-object-type: Normal
x-oss-hash-crc64ecma: 9917578698767912878
x-oss-storage-class: Standard
Content-MD5: OztQ2cgCMku3KnT82QYABA==
x-oss-server-time: 5

{"url":"http://52fhy.com"}

swoole_event_add函数原型

bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null,
    int $flags = null);

$sock可以为以下四种类型:

int,就是文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其他fd

stream资源,就是stream_socket_client/fsockopen创建的资源

sockets资源,就是sockets扩展中socket_create创建的资源,需要在编译时加入 ./configure --enable-sockets

object,swoole_process或swoole_client,底层自动转换为管道或客户端连接的socket

多个tcp客户端实时交互

上面的例子,已经实现了异步tcp客户端。接下来的例子会复杂些:可以在客户端A输入,客户端B能实时收到,反之也可以。

首先,我们得创建个tcp_server:

swoole_tcp_server.php

on("Start", function(){
    echo "Tcp server start. Waiting client... 
";
});

$serv->on("Connect", function($serv, $fd){
    echo "New client fd:{$fd}. 
";
});

$serv->on("Receive", function($serv, $fd, $from_id, $data){
    echo "Recv msg from fd:{$fd}:{$data}
";
    foreach ($serv->connections as $client) {
        if($fd != $client){
            $serv->send($client, $data);
        }
    }
});

$serv->on("Close", function($serv, $fd){
    echo "Client fd:{$fd} closed. 
";
});

$serv->start();

然后实现客户端:

event_add_tcp_client.php


先运行服务端:

$ php swoole_tcp_server.php
Tcp server start. Waiting client...

打开两个终端,运行客户端:

$ php event_add_tcp_client.php
ENTER MSG:

效果图:

swoole_client

其实swoole已经提供了异步的swoole_client,无需使用stream_socket_*系列函数:

on("connect", function(swoole_client $cli) {
    
});
$client->on("receive", function(swoole_client $cli, $data){
    echo "Receive: $data";
    $cli->send(str_repeat("A", 100)."
");
    sleep(1);
});
$client->on("error", function(swoole_client $cli){
    echo "error
";
});
$client->on("close", function(swoole_client $cli){
    echo "Connection close
";
});
$client->connect("127.0.0.1", 9001);

还有swoole实现的tcp/udp同步阻塞客户端:

$client = new swoole_client(SWOOLE_SOCK_TCP);
if (!$client->connect("127.0.0.1", 9001, -1))
{
    exit("connect failed. Error: {$client->errCode}
");
}
$client->send("hello world
");
echo $client->recv();
$client->close();

swoole_client 函数原型:

swoole_client->__construct(int $sock_type, int $is_sync = SWOOLE_SOCK_SYNC, string $key);

可以使用swoole提供的宏来之指定类型,请参考 swoole常量定义

$sock_type表示socket的类型,如TCP/UDP

使用$sock_type | SWOOLE_SSL可以启用SSL加密
$is_sync表示同步阻塞还是异步非阻塞,默认为同步阻塞

$key用于长连接的Key,默认使用IP:PORT作为key。相同key的连接会被复用

php-fpm/apache环境下只能使用同步客户端。异步客户端只能使用在cli命令行环境。

异步http客户端

curl或者file_get_contents发送http请求是同步阻塞的。基于swoole_event_add封装可以实现异步。

swoole_event_add实现异步http客户端

event_add_http_client.php

http($url, $method, $postfields, $headers);
            $worker->write($response);
        }, true);
        $process->start();

        //异步读取
        swoole_event_add($process->pipe, function($pipe) use ($process){
            $response = $process->read();
            // print_r($response);
            if(is_callable($this->callback)){
                call_user_func($this->callback, $response); //回调
            }
            swoole_event_del($pipe);
        });
    }

    public function setCallback($callback){
        $this->callback = $callback;
    }

    /**
     * http请求
     */
    private function http($url, $method, $postfields = NULL, $headers = array()) {
        try{
            $ssl = stripos($url,"https://") === 0 ? true : false;
            $ci = curl_init();
            /* Curl settings */
            curl_setopt($ci, CURLOPT_USERAGENT, @$_SERVER["HTTP_USER_AGENT"]); //在HTTP请求中包含一个"User-Agent: "头的字符串。    
            curl_setopt($ci, CURLOPT_CONNECTTIMEOUT, 30);
            curl_setopt($ci, CURLOPT_TIMEOUT, 30);
            curl_setopt($ci, CURLOPT_RETURNTRANSFER, TRUE);
            curl_setopt($ci, CURLOPT_ENCODING, "");
            if ($ssl) {
                curl_setopt($ci, CURLOPT_SSL_VERIFYPEER, 0); // 对认证证书来源的检查 
                curl_setopt($ci, CURLOPT_SSL_VERIFYHOST, 2); // 从证书中检查SSL加密算法是否存在
            }
            curl_setopt($ci, CURLOPT_HEADER, FALSE);
    
            switch ($method) {
                case "POST":
                    curl_setopt($ci, CURLOPT_POST, TRUE);
                    if (!empty($postfields)) {
                        curl_setopt($ci, CURLOPT_POSTFIELDS, $postfields);
                    }
                    break;
            }
    
            curl_setopt($ci, CURLOPT_URL, $url );
            curl_setopt($ci, CURLOPT_HTTPHEADER, $headers );
            curl_setopt($ci, CURLINFO_HEADER_OUT, TRUE );
    
            $response = curl_exec($ci);
            $httpCode = curl_getinfo($ci, CURLINFO_HTTP_CODE);
            $httpInfo = curl_getinfo($ci);
            
            if (FALSE === $response)
                throw new Exception(curl_error($ci), curl_errno($ci));
        
        } catch(Exception $e) {
            throw $e;
        }
        
        //echo "
";
        //var_dump($response);
        //var_dump($httpInfo);

        curl_close ($ci);
        return $response;
    }
}

$client = new HttpClient("http://www.52fhy.com/test.json");
$client->setCallback(function($response){
    print_r($response);
});
echo "OK
";

运行:

$ php event_add_http_client.php
OK
{"url":"http://52fhy.com"}[

由返回结果可以看出,客户端请求是异步执行的。

swoole_http_client

Swoole也内置了http异步客户端(swoole>=1.8.0)。

相比curl和file_get_contents这样PHP提供的Http客户端,swoole_http_client最大的优势是支持大量并发。
file_get_contents只能同时请求一个URL,并发只能通过开启多进程实现。curl提供了curl_multi功能实现并发基于select和多线程。并发能力都很差。而swoole_http_client是基于epoll实现的异步客户端,没有并发限制,可在一个进程内同时并发上万请求。更多介绍详见swoole文档。

示例:

get:

$cli = new swoole_http_client("www.52fhy.com", 80);
$cli->setHeaders(["User-Agent" => "swoole"]);

$cli->get("/test.json", function ($cli) {
    echo $cli->body;
});

echo "ok
";

输出:

ok
{"url":"http://52fhy.com"}

post:

$cli = new swoole_http_client("127.0.0.1", 81); 
$cli->post("/post_demo.php", array("a" => "1234", "b" => "456"), function ($cli) {
    echo "Length: " . strlen($cli->body) . "
";
    echo $cli->body;
});

echo "ok
";

websocket:

on("message", function ($_cli, $frame) {
    // var_dump($frame);
    echo $frame->data;
});

$cli->upgrade("/", function ($cli) {
    $cli->push("hello world");
});

echo "ok
";

发送完客户端会立即close。

参考

1、swoole_event_add
https://wiki.swoole.com/wiki/...
2、Client
https://wiki.swoole.com/wiki/...
3、利用swoole_process和eventloop实现php异步编程
https://segmentfault.com/a/11...
4、关于swoole_process的一些使用疑惑
https://group.swoole.com/ques...
5、swoole多进程操作
https://blog.csdn.net/koastal...
6、swoole教程第一节:进程管理模块(Process)-中(消息队列)
https://segmentfault.com/a/11...
7、PHP编程中尝试程序并发的几种方式总结
http://www.jb51.net/article/8...
8、1.8.0 使用内置Http异步客户端
https://wiki.swoole.com/wiki/...
9、异步Http/WebSocket客户端
https://wiki.swoole.com/wiki/...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/28639.html

相关文章

  • swoole_process父子进程管道通信案例

    摘要:话不多说直接上代码创建的子进程获取异步获取更高性能启动子进程子进程处理逻辑异步非阻塞网关连接失败读取父进程管道消息父进程获取子进程的管道消息子进程消息子进程的客户端可以忽略不计,本只是 话不多说直接上代码 创建的子进程: public function __construct() { $this->redis = Container::get(SwooleR...

    Cheng_Gang 评论0 收藏0
  • Swoole 源码分析——Async 异步事件系统 Swoole_Event

    摘要:利用将传入的转为文件描述符新建对象,并对其设置文件描述符读写回调函数检测是否存在,并对其进行初始化。如果传入在中不存在返回,用于修改事件监听的回调函数和掩码。异常事件回调函数当发现套接字发生错误后,就会自动删除该套接字的监听。 前言 对于异步的任务来说,Server 端的 master 进程与 worker 进程会自动将异步的事件添加到 reactor 的事件循环中去,task_wor...

    stefanieliang 评论0 收藏0
  • 基于Swoole和Redis实现的并发队列处理系统

    摘要:大家知道,一个消息队列处理系统主要分为两大部分消费者和生产者。任务系统实时的对任务队列进行,出来一个任务就一个子进程,由子进程完成具体的任务逻辑。新的设计为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。 背景 由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。 大家知道,一个消息队列...

    booster 评论0 收藏0
  • 使用 swoole_process 实现 PHP 进程池

    摘要:本文使用与完成一个的进程池,并且支持动态创建新进程。接着遍历所有的进程,并且加入中,设置可读事件,用于接收子进程的空闲信号。最后每隔一秒向进程投递任务。由于只模拟了十次任务,则第十个任务完成之后在父进程中发送使所有子进程退出。 swoole_process 主要是用来代替 PHP 的 pcntl 扩展。我们知道 pcntl 是用来进行多进程编程的,而 pcntl 只提供了 fork 这...

    Andrman 评论0 收藏0

发表评论

0条评论

Cympros

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<