摘要:前些时间我们发布了实例协程池异步邮件发送守护程序范例,这一次我们提供一个使用大厂通过协程化来并行执行短信发送任务,本文是一个代码简单性能极强的范例。
前些时间我们发布了 Mix PHP V2 实例:协程池异步邮件发送守护程序 范例,这一次我们提供一个使用大厂 SDK 通过 Swoole Hook 协程化来并行执行短信发送任务,本文是一个代码简单、IO 性能极强的范例。
请先升级到 mix-framework >= v2.0.5。
本范例依然使用消息队列的方式接收短信发送任务,消息中间件使用:
redis
生产者通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接 $redis = new Redis(); if (!$redis->connect("127.0.0.1", 6379)) { throw new Exception("Redis connect failed."); } $redis->auth(""); $redis->select(0); // 投递任务 for($i = 0; $i < 3; $i++){ $data = [ "phone" => "***", "templateCode" => "SMS_***", "templateParam" => ["code" => 123456], ]; $redis->lpush("queue:sms", serialize($data)); }消费者
使用的是 ali 云的短信服务,查看官方 PHP SDK 文档 ,使用的库为:
composer require alibabacloud/client
通过查看该库的 composer 依赖文件,我们得知该库基于 guzzlehttp 开发,因为 Mix PHP 提供了无需修改代码就可 Hook Guzzle 库可在协程中使用的工具 Mix PHP V2 生态:让 Guzzle 支持 Swoole 的 Hook 协程,所以能基本确定该库可在 Swoole 协程中使用。
首先我们安装 https://github.com/mix-php/guzzle-hook 让 alibabacloud/client 可在协程中使用:
composer require mix/guzzle-hook
然后在项目的 composer.json 文件中增加 extra 配置项,如下:
"extra": { "include_files": [ "vendor/mix/guzzle-hook/src/functions_include.php" ] }
更新自动加载:
composer dump-autoload
下面我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的短信发送程序。
首先我们在配置 applications/console/config/main.php 中注册一个命令:
// 命令 "commands" => [ "smser" => [ "Smser", "description" => "SMS send daemon demo.", "options" => [ [["d", "daemon"], "description" => "Run in the background"], ], ], ],
注册的命令中指定的 Smser 命令类,接下来我们编写一个 SmserCommand 类:
applications/console/src/Commands/SmserCommand.php
*/ class SmserCommand { const ACCESS_KEY = "***"; const ACCESS_SECRET = "***"; /** * 退出 * @var bool */ public $quit = false; /** * 主函数 */ public function main() { // 守护处理 $daemon = Flag::bool(["d", "daemon"], false); if ($daemon) { ProcessHelper::daemon(); } // 捕获信号 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 设置ali云全局参数 AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId("cn-hangzhou")->asDefaultClient(); // 手动关闭Swoole文件Hook,因为ali云依赖的uuid库有文件hook协程兼容问题,Swoole 4.4已经适配该问题 Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE); // 协程池执行任务 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ "jobQueue" => $jobQueue, "maxWorkers" => $maxWorkers, ]); $dispatch->start(SmserWorker::class); // 投放任务 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop(["queue:sms"], 3); } catch (Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop命令最后一个键才是值 $jobQueue->push($data); } }); // 等待事件 Event::wait(); } }
从 $data = $redis->brPop(["queue:sms"], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisor、pm2 等工具重启守护进程。
上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的发送代码逻辑就在 SmserWorker 类中:
applications/console/src/Libraries/SmserWorker.php
*/ class SmserWorker extends AbstractWorker implements WorkerInterface { /** * 邮件发送器 * @var Smser */ public $smser; /** * 初始化事件 */ public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 实例化一些需重用的对象 $this->smser = new Smser(); } /** * 处理 * @param $data */ public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $result = $this->smser->send($data["phone"], $data["templateCode"], $data["templateParam"]); app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ["result" => json_encode($result, JSON_UNESCAPED_UNICODE)])); } catch (Throwable $e) { app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ["error" => $e->getMessage()])); } } }
由以上代码可见,Worker 在初始化时,新增了一个 Smser 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Smser 发送程序:
applications/console/src/Libraries/Smser.php
*/ class Smser { /** * 配置信息 */ const SIGN_NAME = "***"; /** * Smser constructor. */ public function __construct() { // 开启协程钩子 Coroutine::enableHook(); } /** * 发送 * @param $phone * @param $templateCode * @param $templateParam * @return array * @throws ClientException * @throws ServerException */ public function send($phone, $templateCode, $templateParam) { $result = AlibabaCloud::rpc() ->product("Dysmsapi") // ->scheme("https") // https | http ->version("2017-05-25") ->action("SendSms") ->method("POST") ->options([ "query" => [ "PhoneNumbers" => $phone, "SignName" => static::SIGN_NAME, "TemplateCode" => $templateCode, "TemplateParam" => json_encode($templateParam), ], ]) ->request(); return $result->toArray(); } }
以上就完成了全部的代码逻辑,现在我们开始测试,先启动消费者守护程序:
[root@localhost bin]# ./mix-console smser
将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):
[root@localhost bin]# php /tmp/push.php
消费者守护程序结果:
[root@localhost bin]# ./mix-console smser [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}
命令行终端打印了发送成功的日志,发送完成。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/31565.html
摘要:消费者开发使用本例时,请确保你使用的编译时开启了本例我们采用的守护程序协程池来完成一个超高性能的邮件发送程序。 去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。...
摘要:主函数查询数据不手动释放的连接不会归还连接池,会在析构时丢弃执行结果为,说明是并行执行的。主函数查询数据即便抛出了异常,仍然能执行到,没有导致内的一直处于阻塞状态。主函数一次性定时持续定时停止定时 协程 Mix PHP V2 基于 Swoole 4 的 PHP Stream Hook 协程技术开发,协程使用方式与 Golang 几乎一致,包括框架封装的协程池、连接池、命令行处理都大量参...
摘要:是一个非常流行的的客户端,现在各大厂的也都开始基于开发,因为只支持的协程,而默认是使用扩展的,所以开发了,能在不修改源码的情况下让协程化。 Guzzle 是一个非常流行的 PHP 的 HTTP 客户端,现在各大厂的 SDK 也都开始基于 Guzzle 开发,因为 Swoole 只支持 PHP Stream 的协程 Hook ,而 Guzzle 默认是使用 cURL 扩展的,所以 Mix...
摘要:之前过行代码实现通用协程池今天看了下相关文档,用也实现了一个,由于没有的,所以实现的有点简单,但是实用性还可以,通过工厂函数实现了通用性。官方的协程池是用只能用在。因为协程池代码层耦合了实例化逻辑。 之前过golang40行代码实现通用协程池 今天看了下swoole相关文档,用PHP也实现了一个,由于swoole没有golang的select,所以实现的有点简单,但是实用性还可以,通过...
摘要:所以与多线程相比,线程的数量越多,协程性能的优势越明显。值得一提的是,在此过程中,只有一个线程在执行,因此这与多线程的概念是不一样的。 真正有知识的人的成长过程,就像麦穗的成长过程:麦穗空的时候,麦子长得很快,麦穗骄傲地高高昂起,但是,麦穗成熟饱满时,它们开始谦虚,垂下麦芒。 ——蒙田《蒙田随笔全集》 上篇论述了关于python多线程是否是鸡肋的问题,得到了一些网友的认可,当然也有...
阅读 2166·2021-11-24 09:39
阅读 2780·2021-07-29 13:49
阅读 2321·2019-08-29 14:15
阅读 2232·2019-08-29 12:40
阅读 3311·2019-08-26 13:42
阅读 631·2019-08-26 12:13
阅读 2064·2019-08-26 11:41
阅读 3344·2019-08-23 18:32