摘要:大家知道,一个消息队列处理系统主要分为两大部分消费者和生产者。任务系统实时的对任务队列进行,出来一个任务就一个子进程,由子进程完成具体的任务逻辑。新的设计为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。
背景
由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。
大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。
在我们的系统中,主系统作为生产者,任务系统作为消费者。
具体的工作流程如下: 1、主系统将需要需要处理的任务名称+任务参数push到队列中。 2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。
具体代码如下:
/** * 启动守护进程 */ public function runAction() { Tools::log_message("ERROR", "daemon/run" . " | action: restart", "daemon-"); while (true) { $this->fork_process(); } exit; } /** * 创建子进程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo "* Process {$pid} was created "; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo " * Sub process: {$pid} exited with {$status}"; //Tools::log_message("INFO", "daemon/run succ" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid ); } else { Tools::log_message("ERROR", "daemon/run fail" . "|status:" . $status . "|pid:" . $ppid . "|childpid:" . $pid, "daemon-"); } } } /** * 业务任务队列处理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = "_task_" . $data["worker"]; $class_name = isset($data["class"]) ? $data["class"] : "TaskproModel"; $params = $data["params"]; $class = new $class_name(); $class->$worker($params); return TRUE; }
这是一个简单的任务处理系统。
通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。
但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。
这样很稳定。
但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!
第一个问题还好,但第二个问题就很严重。
当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。
新的设计
为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。
因为在PHP7之前不支持多线程,所以我们采用多进程。
从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。
显然这是不合适的。
我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。
遇到的问题
1、如何控制最大进程数
这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。
自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?
可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!
所以,这里就需要了解一个知识点:信号。
具体的可以自行Google,这里直接看代码。
// install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler"));
这就安装了一个信号处理器。当然还缺少一点。
declare(ticks = 1);
declare是一个控制结构语句,具体的用法也请去Google。
这句代码的意思就是每执行一条低级语句就调用一次信号处理器。
这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。
2、如何解决进程残留
在多进程开发中,如果处理不当就会导致进程残留。
为了解决进程残留,必须得将子进程回收。
那么如何对子进程进行回收就是一个技术点了。
在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。
但我们是基于Redis的brpop的,而brpop是阻塞的。
这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。
这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。
进程回收也放到信号处理器中去。
新系统的评估
pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。
所以这里采用Swoole扩展中的Process。
具体代码如下:
declare(ticks = 1); class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = "task:wing"; //待处理队列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler")); set_time_limit(0); ini_set("default_socket_timeout", -1); //队列处理不超时,解决redis报错:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect("redis.master.host",6379); return $rds; } public function process(swoole_process $worker){// 第一个处理 $GLOBALS["worker"] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS["worker"]; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo "From Master: $recv "; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ "abc" => $i, "timestamp" => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){ // echo " now we de have $this->child child processes "; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待 if (!$data_pop){ continue; } echo " Starting new child | now we de have $this->child child processes "; $this->child++; $process = new swoole_process([$this, "process"]); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) { // echo "Recive: $signo "; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) { // echo "PID={$ret["pid"]} "; $this->child--; } } } }
最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/30967.html
摘要:下文如无特殊声明将使用进程同时表示进程线程。收到数据后服务器程序进行处理然后使用向客户端发送响应。现在各种高并发异步的服务器程序都是基于实现的,比如。 并发 IO 问题一直是服务器端编程中的技术难题,从最早的同步阻塞直接 Fork 进程,到 Worker 进程池/线程池,到现在的异步IO、协程。PHP 程序员因为有强大的 LAMP 框架,对这类底层方面的知识知之甚少,本文目的就是详细介...
摘要:在版本中我们将的进程管理模块封装成了类,现在可以在代码中使用的进程管理器了。提供的进程管理器来自于,经过大量生产项目验证,稳定性和健壮性都非常高。三任务投递进程管理器自带了消息队列和消息投递的支持。 在Swoole-2.1.2版本中我们将Server的进程管理模块封装成了PHP类,现在可以在PHP代码中使用Swoole的进程管理器了。 在实际项目中经常需要写一些长期运行的脚本,如基于r...
摘要:易用稳定,本次想通过对的学习和个人解析,吸收框架的思想和设计知识,加强自己对的认知和理解。当然,笔者能力水平有限,后续的文章如有错误,还请指出和谅解。目录如下后续添加文章都会记录在此服务启动过程以及主体设计流程源码解析 前言 swoole是什么?官网的原话介绍是这样的: Swoole 使用纯 C 语言编写,提供了 PHP 语言的异步多线程服务器,异步 TCP/UDP 网络客户端,异步 ...
摘要:基于扩展实现真正的数据库连接池这种方案中,项目占用的连接数仅仅为。一种是连接暂时不再使用,其占用状态解除,可以从使用者手中交回到空闲队列中这种我们称为连接的归队。源码剖析系列目录 作者:bromine链接:https://www.jianshu.com/p/1a7...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft Github: https:...
摘要:为语言提供了强大的协程编程模式。提供的协程语法借鉴自,在此向开发组致敬协程可以与很好地互补。并发执行使用创建协程,可以让和两个函数变成并发执行。协程需要拿到请求的结果。 Swoole4为PHP语言提供了强大的CSP协程编程模式。底层提供了3个关键词,可以方便地实现各类功能。 Swoole4提供的PHP协程语法借鉴自Golang,在此向GO开发组致敬 PHP+Swoole协程可以与...
阅读 1703·2021-09-26 09:55
阅读 3740·2021-09-22 15:31
阅读 7453·2021-09-22 15:12
阅读 2222·2021-09-22 10:02
阅读 4696·2021-09-04 16:40
阅读 1075·2019-08-30 15:55
阅读 3032·2019-08-30 12:56
阅读 1824·2019-08-30 12:44