资讯专栏INFORMATION COLUMN

使用 swoole_process 实现 PHP 进程池

Andrman / 1072人阅读

摘要:本文使用与完成一个的进程池,并且支持动态创建新进程。接着遍历所有的进程,并且加入中,设置可读事件,用于接收子进程的空闲信号。最后每隔一秒向进程投递任务。由于只模拟了十次任务,则第十个任务完成之后在父进程中发送使所有子进程退出。

swoole_process 主要是用来代替 PHP 的 pcntl 扩展。我们知道 pcntl 是用来进行多进程编程的,而 pcntl 只提供了 fork 这样原始的接口,容易使用错误,并且没有提供进程间通信以及重定向标准输入输出的功能。

而 swoole_process 则提供了比 pcntl 更强大的功能,更易用的API,使PHP在多进程编程方面更加轻松。

本文使用 swoole_process 与 EventLoop 完成一个 php 的进程池,并且支持动态创建新进程。
EventLoop

swoole 有一个 Reactor 线程,这个线程可以说是对 epoll 模型的封装,可以设置 read 事件和 write 事件的监听回调函数。

下面会用到一个函数:

bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);

参数1为一个文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其他 fd(socket_create 创建的资源 , stream_socket_client/fsockopen创建的资源)

参数2为可读事件回调函数

参数3为可写事件回调函数

多进程编程少不了进程之间的通讯,swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe)。那么本文使用的是 pipe 的方式。

下面是一个定时向进程池投递任务的例子。

代码:

process = new swoole_process(array($this, "run"), false, 2);
        $this->process->start();
        swoole_process::wait();
    }

    public function run()
    {
        $this->current_num = $this->min_worker_num;
        //创建所有的worker进程
        for($i = 0; $i < $this->current_num; $i++){
            $process = new swoole_process(array($this, "task_run"), false, 2);
            $pid = $process->start();
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }

        foreach($this->process_list as $process){
            swoole_event_add($process->pipe, function ($pipe) use ($process){
                $data = $process->read();
                var_dump($data . "空闲");
                //接收子进程处理完成的信息,并且重置为空闲
                $this->process_use[$data] = 0;
            });
        }

        //每秒定时向worker管道投递任务
        swoole_timer_tick(1000 ,function ($timer_id){
            static $index = 0;
            $index = $index + 1;
            $flag = true; //是否新建worker
            foreach ($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //标记为正在使用
                    $this->process_use[$pid] = 1;
                    // 在父进程内调用write,子进程可以调用read接收此数据
                    $this->process_list[$pid]->write($index. "hello");
                    break;
                }
            }

            if($flag && $this->current_num < $this->max_worker_num){
                //没有闲置worker,新建worker来处理
                $process = new swoole_process(array($this, "task_run"), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1;
                $this->process_list[$pid]->write($index. "hello");
                $this->current_num++;
            }
            var_dump("第" .$index. "个任务");
            if($index == 10){
                foreach($this->process_list as $process){
                    $process->write("exit");
                }
                swoole_timer_clear($timer_id);
                $this->process->exit();
            }

        });
    }

    /**
     * 子进程处理
     * @param $worker
     */
    public function task_run($worker)
    {
        swoole_event_add($worker->pipe, function($pipe)use($worker){
            $data = $worker->read();
            var_dump($worker->pid . ":" . $data);
            if($data == "exit"){
                $worker->exit();
                exit;
            }
            //模拟耗时任务
            sleep(5);
            //告诉主进程处理完成
            //在子进程内调用write,父进程可以调用read接收此数据
            $worker->write($worker->pid);
        });
    }

}

new ProcessPool();

首先定义几个重要的属性:

$process_list :Worker 进程数组

$process_use:正在被使用的进程

$min_worker_num :最少进程数量

$max_worker_num :最多进程数量

$current_num :当前进程数量

$process : 主进程

在实例化的时候创建主进程,并且运行 run 方法,在 run 方法里面先创建所有的 worker 进程,并且设置为空闲状态。

接着遍历所有的 worker 进程,并且加入 EventLoop 中,设置可读事件,用于接收子进程的空闲信号。

最后每隔一秒向 worker 进程投递任务。动态扩充进程池则在这里实现,如果没有闲置的进程,而此时又有新的任务,则需要动态创建一个新的进程并且置为繁忙状态。由于只模拟了十次任务,则第十个任务完成之后在父进程中发送 exit 使所有子进程退出。

运行效果与图解:

参考链接:

https://wiki.swoole.com/wiki/...
https://opso.coding.me/2018/0...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29584.html

相关文章

  • Swoole 源码分析——进程管理 Swoole_Process

    摘要:清空主进程残留的定时器与信号。设定为执行回调函数如果在回调函数中调用了异步系统,启动函数进行事件循环。因此为了区分两者,规定并不允许两者同时存在。 前言 swoole-1.7.2 增加了一个进程管理模块,用来替代 PHP 的 pcntl 扩展。 PHP自带的pcntl,存在很多不足,如 pcntl 没有提供进程间通信的功能 pcntl 不支持重定向标准输入和输出 pcntl 只...

    pepperwang 评论0 收藏0
  • PHP进程系列笔记(五)

    摘要:消息队列更常见的用途是主进程分配任务,子进程消费执行。子进程前面加了个,这是为了防止父进程还未往消息队列中加入内容直接退出。 前面几节都是讲解pcntl扩展实现的多进程程序。本节给大家介绍swoole扩展的swoole_process模块。 swoole多进程 swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。 首先,确保安装的swoole...

    qianfeng 评论0 收藏0
  • Swoole笔记(一)

    摘要:修复添加超过万个以上定时器时发生崩溃的问题增加模块,下高性能序列化库修复监听端口设置无效的问题等。线程来处理网络事件轮询,读取数据。当的三次握手成功了以后,由这个线程将连接成功的消息告诉进程,再由进程转交给进程。此时进程触发事件。 本文示例代码详见:https://github.com/52fhy/swoo...。 简介 Swoole是一个PHP扩展,提供了PHP语言的异步多线程服务器...

    SHERlocked93 评论0 收藏0
  • Swoole 4.0 正式版,面向生产环境的 PHP 协程引擎

    摘要:在禁止场景中使用协程会出现各种莫名其妙的问题发生。限制了协程的应用范围。新版本基于汇编代码实现了全新的协程内核。实现了对所有语法的支持。稳定性和健壮性均已达到工业级的水准。完全可用于大型项目的生产环境中。 Swoole虽然在2016年就支持了协程特性,但由于底层是基于setjmp/longjmp实现的stackless方案。因此在某些场景下,如call_user_func、array_...

    Zack 评论0 收藏0

发表评论

0条评论

Andrman

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<