资讯专栏INFORMATION COLUMN

PHP多进程系列笔记(五)

qianfeng / 2402人阅读

摘要:消息队列更常见的用途是主进程分配任务,子进程消费执行。子进程前面加了个,这是为了防止父进程还未往消息队列中加入内容直接退出。

前面几节都是讲解pcntl扩展实现的多进程程序。本节给大家介绍swoole扩展的swoole_process模块。

swoole多进程

swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。

首先,确保安装的swoole版本大于1.7.2:

$ php --ri swoole

swoole

swoole support => enabled
Version => 1.10.1
注意:swoole_process在最新的1.8.0版本已经禁止在Web环境中使用了,所以也只能支持命令行。

swoole提供的多进程扩展基本功能和pcntl提供的一样,但swoole更易简单上手,并且提供了:

默认基于unixsock的进程间通信;

支持消息队列作为进程间通信;

基于signalfd和eventloop处理信号,几乎没有任何额外消耗;

高精度微秒定时器;

配合swoole_event模块,创建的PHP子进程可以异步的事件驱动模式

swoole_process 模块提供的方法(Method)主要分为四部分:

基础方法

swoole_process::__construct
swoole_process->start
swoole_process->name
swoole_process->exec
swoole_process->close
swoole_process->exit
swoole_process::kill
swoole_process::wait
swoole_process::daemon
swoole_process::setAffinity

管道通信

swoole_process->write
swoole_process->read
swoole_process->setTimeout
swoole_process->setBlocking

消息队列通信

swoole_process->useQueue
swoole_process->statQueue
swoole_process->freeQueue
swoole_process->push
swoole_process->pop

信号与定时器

swoole_process::signal
swoole_process::alarm
基础应用

本例实现的是tcp server,特性:

多进程处理客户端连接

子进程退出,Master进程会重新创建一个

支持事件回调

主进程退出,子进程在干完手头活后退出

mpid = $id = getmypid();   
            echo time()." Master process, pid {$id}
"; 

            //创建tcp server
            $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
            if(!$this->socket) exit("start server err: $errstr --- $errno");

            for($i=0; $istart_worker_process();
            }
    
            echo "waiting client...
";
    
            //Master进程等待子进程退出,必须是死循环
            while(1){
                foreach($this->pids as $k=>$pid){
                    if($pid){
                        $res = swoole_process::wait(false);
                        if ( $res ){
                            echo time()." Worker process $pid exit, will start new... 
";
                            $this->start_worker_process();
                            unset($this->pids[$k]);
                        }
                    }
                }
                sleep(1);//让出1s时间给CPU
            }
        }, false, false); //不启用管道通信
        swoole_process::daemon(); //守护进程
        $process->start();//注意:start之后的变量子进程里面是获取不到的
    }

    /**
     * 创建worker进程,接受客户端连接
     */
    private function start_worker_process(){
        $process = new swoole_process(function(swoole_process $worker){
            $this->acceptClient($worker);
        }, false, false);
        $pid = $process->start();
        $this->pids[] = $pid;
    }

    private function acceptClient(&$worker)
    {
        //子进程一直等待客户端连接,不能退出
        while(1){
            
            $conn = stream_socket_accept($this->socket, -1);
            if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件

            //开始循环读取消息
            $recv = ""; //实际收到消息
            $buffer = ""; //缓冲消息
            while(1){
                $this->checkMpid($worker);
                
                $buffer = fread($conn, 20);

                //没有收到正常消息
                if($buffer === false || $buffer === ""){
                    if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件
                    break;//结束读取消息,等待下一个客户端连接
                }

                $pos = strpos($buffer, "
"); //消息结束符
                if($pos === false){
                    $recv .= $buffer;                            
                }else{
                    $recv .= trim(substr($buffer, 0, $pos+1));

                    if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件

                    //客户端强制关闭连接
                    if($recv == "quit"){
                        echo "client close conn
";
                        fclose($conn);
                        break;
                    }

                    $recv = ""; //清空消息,准备下一次接收
                }
            }
        }
    }

    //检查主进程是否存在,若不存在子进程在干完手头活后退出
    public function checkMpid(&$worker){
        if(!swoole_process::kill($this->mpid,0)){
            $worker->exit();
            // 这句提示,实际是看不到的.需要写到日志中
            echo "Master process exited, I [{$worker["pid"]}] also quit
";
        }
    }

    function __destruct() {
        @fclose($this->socket);
    }
}

$server =  new TcpServer();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "
";
    fwrite($conn,"conn success
");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "
";
    fwrite($conn,"received ".$msg."
");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "
";
    fwrite($conn,"onClose "."
");
};

$server->run();

运行后可以使用telnet连接:

telnet 127.0.0.1 9201

由于设置了最大三个子进程,最多只能接受3个客户端连接。

进程间通信

前面讲解的例子里,主进程和子进程直接是没有直接的数据交互的。如果主进程需要得到的来自子进程的反馈,或者子进程接受来自主进程的数据,那么就需要进程间通信了。

swoole内置了管道通信和消息队列通信。

管道通信

管道通信主要是数据传输:一个进程需要将数据发送给另外一个进程。

这个swoole封装后,使用非常简单:

read();

        ob_start();
        passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
        $return = ob_get_clean() ? : " ";
        $return = trim($return).". worker pid:".$worker->pid."
";
        
        // $worker->write($return);//写入数据到管道
        echo $return;//写入数据到管道。注意:子进程里echo也是写入到管道
    }, true); //第二个参数为true,启用管道通信
    $pid = $process->start();
    $workers[$pid] = $process;  
}

foreach($workers as $pid=>$worker){
    $worker->write("whoami"); //通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的
    $recv = $worker->read();//同步阻塞读取管道数据
    echo "recv result: $recv";
}

//回收子进程
while(count($workers)){
    // echo time(). "
";
    foreach($workers as $pid=>$worker){
        $ret = swoole_process::wait(false);
        if($ret){
            echo "worker exit: $pid
";
            unset($workers[$pid]);
        }
    }
}

运行:

$ php swoole_process_pipe.php 
recv result: Linux
recv result: 2018年 06月 24日 星期日 16:18:01 CST
recv result: yjc
worker exit: 14519
worker exit: 14522
worker exit: 14525

注意点:
1、管道数据读取是同步阻塞的;上面的例子里如果子进程里再加一句$worker->read(),会一直阻塞。可以使用swoole_event_add将管道加入到事件循环中,变为异步模式。
2、子进程里的输出(例如echo)与write效果相同。
3、通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的。

这里额外讲解一下swoole_process::wait()
1、swoole_process::wait()默认是阻塞的, swoole_process::wait(false)则是非阻塞的;
2、swoole_process::wait()阻塞模式调用一次仅能回收一个子进程,非阻塞模式调用一次不一定能当前就能回收子进程;
3、如果不加swoole_process::wait(),主进程又是死循环,主进程退出后会变成僵尸进程。

ps -A -o stat,ppid,pid,cmd | grep -e "^[Zz]"可以查询僵尸进程。

防盗版声明:本文系原创文章,发布于公众号飞鸿影的博客(fhyblog)及博客园,转载需作者同意。


消息队列通信

消息队列与管道有些不一样:消息队列是全局的,所有进程都可以发送、读取。你可以把它看做redis list结构。

消息队列更常见的用途是主进程分配任务,子进程消费执行。

pop()){
            // echo "recv from master: $cmd
";

            ob_start();
            passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
            $return = ob_get_clean() ? : " ";
            $return = "res: ".trim($return).". worker pid: ".$worker->pid."
";
            
            echo $return;
            // sleep(1);
        }

        $worker->exit(0);
    }, false, false); //不创建管道

    $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息队列
    $pid = $process->start();
    $workers[$pid] = $process;
}

//由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可
$worker = current($workers);
for ($i=0; $i<3; $i++) {
    $worker->push("whoami"); //发送消息
}


//回收子进程
while(count($workers)){
    foreach($workers as $pid=>$worker){
        $ret = swoole_process::wait();
        if($ret){
            echo "worker exit: $pid
";
            unset($workers[$pid]);
        }
    }
}

运行结果:

$ php swoole_process_quene.php 
res: yjc. worker pid: 15885
res: yjc. worker pid: 15886
res: yjc. worker pid: 15887
worker exit: 15885
worker exit: 15886
worker exit: 15887

注意点:
1、所有进程共享使用一个消息队列;
2、消息队列的读取操作是阻塞的,可以在useQueue的时候第2个参数mode改为2 | swoole_process::IPC_NOWAIT,则是异步的。mode仅仅设置为2是阻塞的,示例里去掉swoole_process::IPC_NOWAIT后读取消息的while会死循环。
3、子进程前面加了个sleep(1);,这是为了防止父进程还未往消息队列中加入内容直接退出。
4、子进程末尾也加了sleep,这是为了防止一个进程把所有消息都消费完了,实际应用需要去掉。

信号与定时器

swoole_process::alarm支持微秒定时器:

 5) {
        //清除定时器
        swoole_process::alarm(-1);

        //退出进程
        swoole_process::kill(getmypid());
        
    }
}

//安装信号
swoole_process::signal(SIGALRM, "ev_timer");

//触发定时器信号:单位为微秒。如果为负数表示清除定时器
swoole_process::alarm(100 * 1000);//100ms

echo getmypid()."
"; //该句会顺序执行,后续无需使用while循环防止进程直接退出

运行:

$ php swoole_process_alarm.php 
13660
#0    alarm
#1    alarm
#2    alarm
#3    alarm
#4    alarm
#5    alarm
注:alarm不能和SwooleTimer同时使用。
参考

1、Process-Swoole-Swoole文档中心
https://wiki.swoole.com/wiki/...


欢迎关注公众号及时获取最新文章推送!


推荐!每月仅需$2.5,即可拥有配置SSD的VPS!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/28922.html

相关文章

  • PHP进程系列笔记(一)

    摘要:用于创建子进程。该函数阻塞当前进程,只到当前进程的一个子进程退出或者收到一个结束当前进程的信号。注意处需要注意子进程需要防止子进程也进入循环。如果没有,最终创建的子进程不只个。 本系列文章将向大家讲解pcntl_*系列函数,从而更深入的理解进程相关知识。 PCNTL在PHP中进程控制支持默认是关闭的。您需要使用 --enable-pcntl 配置选项重新编译PHP的 CGI或CLI版本...

    ddongjian0000 评论0 收藏0
  • PHP进程系列笔记(二)

    摘要:任何进程在退出前使用退出都会变成僵尸进程用于保存进程的状态等信息,然后由进程接管。这时候就算手动结束脚本程序也无法关闭这个僵尸子进程了。那么子进程结束后,没有回收,就产生僵尸进程了。本小节我们通过安装信号处理函数来解决僵尸进程问题。 上一篇文章讲解了pcntl_fork和pcntl_wait两个函数的使用,本篇继续讲解PHP多进程相关新知识。 僵尸(zombie)进程 这里说下僵尸进程...

    CatalpaFlat 评论0 收藏0
  • PHP进程系列笔记(四)

    摘要:本节主要讲解常用函数和进程池的概念,也会涉及到守护进程的知识。所以任何时候,建议预先创建好进程,也就是使用进程池的方式实现。 本节主要讲解Posix常用函数和进程池的概念,也会涉及到守护进程的知识。本节难度较低。 Posix常用函数 posix_kill 向指定pid进程发送信号。成功时返回 TRUE , 或者在失败时返回 FALSE 。 bool posix_kill ( int $...

    Cc_2011 评论0 收藏0
  • PHP 进程系列笔记(三)

    摘要:本节讲解几个多进程的实例。新开终端,我们使用命令查看进程可以看到个进程个主进程,个子进程。使用命令结束子进程,主进程会重新拉起一个新的子进程。 本节讲解几个多进程的实例。 多进程实例 Master-Worker结构 下面例子实现了简单的多进程管理: 支持设置最大子进程数 Master-Worker结构:Worker挂掉,Master进程会重新创建一个

    focusj 评论0 收藏0
  • Swoole笔记

    摘要:超过此数量后,新进入的连接将被拒绝。表示连接最大允许空闲的时间。当出错时底层会认为是恶意连接,丢弃数据并强制关闭连接。在启动时自动将进程的写入到文件,在关闭时自动删除文件。 配置说明 $server->set(array( daemonize => true, log_file => /www/log/swoole.log, reactor_num => 2, ...

    zgbgx 评论0 收藏0

发表评论

0条评论

qianfeng

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<