资讯专栏INFORMATION COLUMN

PHP 多任务协程处理

jayzou / 899人阅读

摘要:所以本文将探讨多任务协程这方面的内容。我们仅需在处理前进行检测即可。方法用于执行任务,方法用于让调度程序知道何时终止运行。它是一种先进先出数据结构,能够确保每个任务都能够获取足够的处理时间。

本文首发于 PHP 多任务协程处理,转载请注明出处!

上周 有幸和同事一起在 SilverStripe 分享最近的工作事宜。今天我计划分享 PHP 异步编程,不过由于上周我聊过 ReactPHP;我决定讨论一些不一样的内容。所以本文将探讨多任务协程这方面的内容。

另外我还计划把这个主题加入到我正在筹备的一本 PHP 异步编程的图书中。虽然这本书相比本文来说会涉及更多细节,但我觉得本文依然具有实际意义!

那么,开始吧!
new MyIterator(

这就是本文我们要讨论的问题。不过我们会从更简单更熟悉的示例开始。

一切从数组开始

我们可以通过简单的遍历来使用数组:

$array = ["foo", "bar", "baz"];
 
foreach ($array as $key => $value) {
    print "item: " . $key . "|" . $value . "
";
}
 
for ($i = 0; $i < count($array); $i++) {
    print "item: " . $i . "|" . $array[$i] . "
";
}

这是我们日常编码所依赖的基本实现。可以通过遍历数组获取每个元素的键名和键值。

当然,如果我们希望能够知道在何时可以使用数组。PHP 提供了一个方便的内置函数:

print is_array($array) ? "yes" : "no"; // yes
类数组处理

有时,我们需要对一些数据使用相同的方式进行遍历处理,但它们并非数组类型。比如对 DOMDocument 类进行处理:

$document = new DOMDocument();
$document->loadXML("
"); $elements = $document->getElementsByTagName("div"); print_r($elements); // DOMNodeList Object ( [length] => 1 )

这显然不是一个数组,但是它有一个 length 属性。我们能像遍历数组一样,对其进行遍历么?我们可以判断它是否实现了下面这个特殊的接口:

print ($elements instanceof Traversable) ? "yes" : "no"; // yes

这真的太有用了。它不会导致我们在遍历非可遍历数据时触发错误。我们仅需在处理前进行检测即可。

不过,这会引发另外一个问题:我们能否让自定义类也拥有这个功能呢?回答是肯定的!第一个实现方法类似如下:

class MyTraversable implements Traversable
{
    //  在这里编码...
}

如果我们执行这个类,我们将看到一个错误信息:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate
Iterator(迭代器)

我们无法直接实现 Traversable,但是我们可以尝试第二种方案:

class MyTraversable implements Iterator
{
    //  在这里编码...
}

这个接口需要我们实现 5 个方法。让我们完善我们的迭代器:

class MyTraversable implements Iterator
{
    protected $data;

    protected $index = 0;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function current()
    {
        return $this->data[$this->index];
    }

    public function next()
    {
        return $this->data[$this->index++];
    }

    public function key()
    {
        return $this->index;
    }

    public function rewind()
    {
        $this->index = 0;
    }

    public function valid()
    {
        return $this->index < count($this->data);
    }
}

这边我们需要注意几个事项:

我们需要存储构造器方法传入的 $data 数组,以便后续我们可以从中获取它的元素。

还需要一个内部索引(或指针)来跟踪 currentnext 元素。

rewind() 仅仅重置 index 属性,这样 current()next() 才能正常工作。

键名并非只能是数字类型!这里使用数组索引是为了保证示例足够简单。

我们可以向下面这样运行这段代码:

$iterator = new MyTraversable(["foo", "bar", "baz"]);
 
foreach ($iterator as $key => $value) {
    print "item: " . $key . "|" . $value . "
";
}

这看起来需要处理太多工作,但是这是能够像数组一样使用 foreach/for 功能的一个简洁实现。

IteratorAggregate(聚合迭代器)

还记得第二个接口抛出的 Traversable 异常么?下面看一个比实现 Iterator 接口更快的实现吧:

class MyIteratorAggregate implements IteratorAggregate
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function getIterator()
    {
        return new ArrayIterator($this->data);
    }
}

这里我们作弊了。相比于实现一个完整的 Iterator,我们通过 ArrayIterator() 装饰。不过,这相比于通过实现完整的 Iterator 简化了不少代码。

兄弟莫急!先让我们比较一些代码。首先,我们在不使用生成器的情况下从文件中读取每一行数据:

$content = file_get_contents(__FILE__);

$lines = explode("
", $content);

foreach ($lines as $i => $line) {
    print $i . ". " . $line . "
";
}

这段代码读取文件自身,然后会打印出每行的行号和代码。那么为什么我们不使用生成器呢!

function lines($file) {
    $handle = fopen($file, "r");

    while (!feof($handle)) {
        yield trim(fgets($handle));
    }

    fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
    print $i . ". " . $line . "
";
}

我知道这看起来更加复杂。不错,不过这是因为我们没有使用 file_get_contents() 函数。一个生成器看起来就像是一个函数,但是它会在每次获取到 yield 关键词是停止运行。

生成器看起来有点像迭代器:

print_r(lines(__FILE__)); // Generator Object ( )

尽管它不是迭代器,它是一个 Generator。它的内部定义了什么方法呢?

print_r(get_class_methods(lines(__FILE__)));
 
// Array
// (
//     [0] => rewind
//     [1] => valid
//     [2] => current
//     [3] => key
//     [4] => next
//     [5] => send
//     [6] => throw
//     [7] => __wakeup
// )
如果你读取一个大文件,然后使用 memory_get_peak_usage(),你会注意到生成器的代码会使用固定的内存,无论这个文件有多大。它每次进度去一行。而是用 file_get_contents() 函数读取整个文件,会使用更大的内存。这就是在迭代处理这类事物时,生成器的能给我们带来的优势!
Send(发送数据)

可以将数据发送到生成器中。看下下面这个生成器:

current() . "
"; // foo
注意这里我们如何在 call_user_func() 函数中封装生成器函数的?这里仅仅是一个简单的函数定义,然后立即调用它获取一个新的生成器实例...

我们已经见过 yield 的用法。我们可以通过扩展这个生成器来接收数据:

$generator = call_user_func(function() {
    $input = (yield "foo");

    print "inside: " . $input . "
";
});

print $generator->current() . "
";

$generator->send("bar");

数据通过 yield 关键字传入和返回。首先,执行 current() 代码直到遇到 yield,返回 foosend() 将输出传入到生成器打印输入的位置。你需要习惯这种用法。

抛出异常(Throw)

由于我们需要同这些函数进行交互,可能希望将异常推送到生成器中。这样这些函数就可以自行处理异常。

看看下面这个示例:

$multiply = function($x, $y) {
    yield $x * $y;
};

print $multiply(5, 6)->current(); // 30

现在让我们将它封装到另一个函数中:

$calculate = function ($op, $x, $y) use ($multiply) {
    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        return $generator->current();
    }
};

print $calculate("multiply", 5, 6); // 30

这里我们通过一个普通闭包将乘法生成器封装起来。现在让我们验证无效参数:

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            throw new InvalidArgumentException();
        }

        return $generator->current();
    }
};

print $calculate("multiply", 5, "foo"); // PHP Fatal error...

如果我们希望能够通过生成器处理异常?我们怎样才能将异常传入生成器呢!

$multiply = function ($x, $y) {
    try {
        yield $x * $y;
    } catch (InvalidArgumentException $exception) {
        print "ERRORS!";
    }
};

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            $generator->throw(new InvalidArgumentException());
        }

        return $generator->current();
    }
};
print $calculate("multiply", 5, "foo"); // PHP Fatal error...

棒呆了!我们不仅可以像迭代器一样使用生成器。还可以通过它们发送数据并抛出异常。它们是可中断和可恢复的函数。有些语言把这些函数叫做……

我们可以使用协程(coroutines)来构建异步代码。让我们来创建一个简单的任务调度程序。首先我们需要一个 Task 类:

class Task
{
    protected $generator;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $this->generator->next();
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Task 是普通生成器的装饰器。我们将生成器赋值给它的成员变量以供后续使用,然后实现一个简单的 run()finished() 方法。run() 方法用于执行任务,finished() 方法用于让调度程序知道何时终止运行。

然后我们需要一个 Scheduler 类:

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    pulic function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

Scheduler 用于维护一个待执行的任务队列。run() 会弹出队列中的所有任务并执行它,直到运行完整个队列任务。如果某个任务没有执行完毕,当这个任务本次运行完成后,我们将再次入列。

SplQueue 对于这个示例来讲再合适不过了。它是一种 FIFO(先进先出:fist in first out) 数据结构,能够确保每个任务都能够获取足够的处理时间。

我们可以像这样运行这段代码:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "
";
        yield;
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "
";
        yield;
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();

运行时,我们将看到如下执行结果:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5

这几乎就是我们想要的执行结果。不过有个问题发生在首次运行每个任务时,它们都执行了两次。我们可以对 Task 类稍作修改来修复这个问题:

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        if ($this->run) {
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

我们需要调整首次 run() 方法调用,从生成器当前有效的指针读取运行。后续调用可以从下一个指针读取运行...

有些人基于这个思路实现了一些超赞的类库。我们来看看其中的两个...

RecoilPHP

RecoilPHP 是一套基于协程的类库,它最令人印象深刻的是用于 ReactPHP 内核。可以将事件循环在 RecoilPHP 和 RecoilPHP 之间进行交换,而你的程序无需架构上的调整。

我们来看一下 ReactPHP 异步 DNS 解决方案:

function resolve($domain, $resolver) {
    $resolver
        ->resolve($domain)
        ->then(function ($ip) use ($domain) {
            print "domain: " . $domain . "
";
            print "ip: " . $ip . "
";
        }, function ($error) {            
            print $error . "
";
        })
}

function run()
{
    $loop = ReactEventLoopFactory::create();
 
    $factory = new ReactDnsResolverFactory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    resolve("silverstripe.org", $resolver);
    resolve("wordpress.org", $resolver);
    resolve("wardrobecms.com", $resolver);
    resolve("pagekit.com", $resolver);
 
    $loop->run();
}
 
run();

resolve() 接收域名和 DNS 解析器,并使用 ReactPHP 执行标准的 DNS 查找。不用太过纠结与 resolve() 函数内部。重要的是这个函数不是生成器,而是一个函数!

run() 创建一个 ReactPHP 事件循环,DNS 解析器(这里是个工厂实例)解析若干域名。同样,这个也不是一个生成器。

想知道 RecoilPHP 到底有何不同?还希望掌握更多细节!

use RecoilRecoil;
 
function resolve($domain, $resolver)
{
    try {
        $ip = (yield $resolver->resolve($domain));
 
        print "domain: " . $domain . "
";
        print "ip: " . $ip . "
";
    } catch (Exception $exception) {
        print $exception->getMessage() . "
";
    }
}
 
function run()
{
    $loop = (yield Recoil::eventLoop());
 
    $factory = new ReactDnsResolverFactory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    yield [
        resolve("silverstripe.org", $resolver),
        resolve("wordpress.org", $resolver),
        resolve("wardrobecms.com", $resolver),
        resolve("pagekit.com", $resolver),
    ];
}
 
Recoil::run("run");

通过将它集成到 ReactPHP 来完成一些令人称奇的工作。每次运行 resolve() 时,RecoilPHP 会管理由 $resoler->resolve() 返回的 promise 对象,然后将数据发送给生成器。此时我们就像在编写同步代码一样。与我们在其他一步模型中使用回调代码不同,这里只有一个指令列表。

RecoilPHP 知道它应该管理一个有执行 run() 函数时返回的 yield 数组。RoceilPHP 还支持基于协程的数据库(PDO)和日志库。

IcicleIO

IcicleIO 为了一全新的方案实现 ReactPHP 一样的目标,而仅仅使用协程功能。相比 ReactPHP 它仅包含极少的组件。但是,核心的异步流、服务器、Socket、事件循环特性一个不落。

让我们看一个 socket 服务器示例:

use IcicleCoroutineCoroutine;
use IcicleLoopLoop;
use IcicleSocketClientClientInterface;
use IcicleSocketServerServerInterface;
use IcicleSocketServerServerFactory;
 
$factory = new ServerFactory();
 
$coroutine = Coroutine::call(function (ServerInterface $server) {
    $clients = new SplObjectStorage();
     
    $handler = Coroutine::async(
        function (ClientInterface $client) use (&$clients) {
            $clients->attach($client);
             
            $host = $client->getRemoteAddress();
            $port = $client->getRemotePort();
             
            $name = $host . ":" . $port;
             
            try {
                foreach ($clients as $stream) {
                    if ($client !== $stream) {
                        $stream->write($name . "connected.
");
                    }
                }
 
                yield $client->write("Welcome " . $name . "!
");
                 
                while ($client->isReadable()) {
                    $data = trim(yield $client->read());
                     
                    if ("/exit" === $data) {
                        yield $client->end("Goodbye!
");
                    } else {
                        $message = $name . ":" . $data . "
";
                        
                        foreach ($clients as $stream) {
                            if ($client !== $stream) {
                                $stream->write($message);
                            }
                        }
                    }
                }
            } catch (Exception $exception) {
                $client->close($exception);
            } finally {
                $clients->detach($client);
                foreach ($clients as $stream) {
                    $stream->write($name . "disconnected.
");
                }
            }
        }
    );
     
    while ($server->isOpen()) {
        $handler(yield $server->accept());
    }
}, $factory->create("127.0.0.1", 6000));
 
Loop::run();

据我所知,这段代码所做的事情如下:

在 127.0.0.1 和 6000 端口创建一个服务器实例,然后将其传入外部生成器.

外部生成器运行,同时服务器等待新连接。当服务器接收一个连接它将其传入内部生成器。

内部生成器写入消息到 socket。当 socket 可读时运行。

每次 socket 向服务器发送消息时,内部生成器检测消息是否是退出标识。如果是,通知其他 socket。否则,其它 socket 发送这个相同的消息。

打开命令行终端输入 nc localhost 6000 查看执行结果!

该示例使用 SplObjectStorage 跟踪 socket 连接。这样我们就可以向所有 socket 发送消息。

这个话题可以包含很多内容。希望您能看到生成器是如何创建的,以及它们如何帮助编写迭代程序和异步代码。

如果你有问题,可以随时问我。

感谢 Nikita Popov(还有它的启蒙教程 Cooperative multitasking using coroutines (in PHP!) ),Anthony Ferrara 和 Joe Watkins。这些研究工作泽被苍生,给我以写作此篇文章的灵感。关注他们吧,好么?
原文

Co-operative PHP Multitasking

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29045.html

相关文章

  • PHP下的异步尝试二:初识协程

    摘要:如果仅依靠程序自动交出控制的话,那么一些恶意程序将会很容易占用全部时间而不与其他任务共享。多个操作可以在重叠的时间段内进行。 PHP下的异步尝试系列 如果你还不太了解PHP下的生成器,你可以根据下面目录翻阅 PHP下的异步尝试一:初识生成器 PHP下的异步尝试二:初识协程 PHP下的异步尝试三:协程的PHP版thunkify自动执行器 PHP下的异步尝试四:PHP版的Promise ...

    MudOnTire 评论0 收藏0
  • Mix PHP V2 实例:协程池异步邮件发送守护程序

    摘要:消费者开发使用本例时,请确保你使用的编译时开启了本例我们采用的守护程序协程池来完成一个超高性能的邮件发送程序。 去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。...

    lauren_liuling 评论0 收藏0
  • 关于PHP协程与阻塞的思考

    摘要:线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度标准线程是的。以及鸟哥翻译的这篇详细文档我就以他实现的协程多任务调度为基础做一下例子说明并说一下关于我在阻塞方面所做的一些思考。 进程、线程、协程 关于进程、线程、协程,有非常详细和丰富的博客或者学习资源,我不在此做赘述,我大致在此介绍一下这几个东西。 进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系...

    FullStackDeveloper 评论0 收藏0
  • PHP回顾之协程

    摘要:本文先回顾生成器,然后过渡到协程编程。其作用主要体现在三个方面数据生成生产者,通过返回数据数据消费消费者,消费传来的数据实现协程。解决回调地狱的方式主要有两种和协程。重点应当关注控制权转让的时机,以及协程的运作方式。 转载请注明文章出处: https://tlanyan.me/php-review... PHP回顾系列目录 PHP基础 web请求 cookie web响应 sess...

    Java3y 评论0 收藏0
  • IMI 基于 Swoole 开发的协程 PHP 开发框架 常驻内存、协程异步非阻塞

    摘要:介绍是基于开发的协程开发框架,拥有常驻内存协程异步非阻塞等优点。宇润我在年开发并发布了第一个框架,一直维护使用至今,非常稳定,并且有文档。于是我走上了开发的不归路 showImg(https://segmentfault.com/img/bVbcxQH?w=340&h=160); 介绍 IMI 是基于 Swoole 开发的协程 PHP 开发框架,拥有常驻内存、协程异步非阻塞IO等优点。...

    airborne007 评论0 收藏0

发表评论

0条评论

jayzou

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<