资讯专栏INFORMATION COLUMN

PHP物联网开发利器之Actor并发模型

ixlei / 2407人阅读

摘要:然而尽管如此,很多人可能都没有思考过,如何优雅的写出自己的物联网服务器。

PHP不适合做物联网服务端吗?

在传统的思维中,经常会有人告诉你,php不适合用来做物联网服务端,让你换java,node,go等其他语言,是的,没错传统意义上的php,确实很难做物联网服务器,因为它实在太蹩脚了,当然,这也不是意味着彻底就不能做。举个例子,当你想实现一个TCP服务器的时候,你可能需要写出原理大约如下的代码:

</>复制代码

  1. for ($i = 0;$i <= 1;$i++){
  2. $pid = pcntl_fork();
  3. if($pid){
  4. if($i == 0){
  5. $server = stream_socket_server("tcp://127.0.0.1:9501", $errno, $errstr, STREAM_SERVER_BIND);
  6. }else if($i == 1){
  7. $tickTime = time()+3600;
  8. while (1){
  9. usleep(1);
  10. if($tickTime == time()){
  11. //do my tick func
  12. }
  13. }
  14. }
  15. }
  16. }

以上代码的意义等于在一个进程中创建一个TCP 服务端,另外一个进程中死循环来做时间检测,从而实现定时器逻辑。这样看起来,确实很蹩脚,而且对于编程基础普遍比较薄弱的PHPer来说,这真的很难维护。当然这个时候,就会有人说,这不是还有Workerman吗,是的,确实还有Workerman,Workerman就是高度封装了上述代码原理,帮助你专心于实现代码逻辑的一个PHP多进程框架,因此说PHP不时候做物联网,其实这是谬论。当然这个时候可能又会有人说,go语言有协程,你用Workerman当出现阻塞数据库调用的时候,那效率就非常的差,很难出现高并发,这么说没错,但是实际上,我们可以尽可能的用多进程去弥补这个不足,也就是堆机器。当然,如果你真的想锱铢必较,没关系,这个时候我们就可以拿出我们的杀器,那就是Swoole4.x的协程。

Swoole做TCP服务器

举个例子,如下代码:

</>复制代码

  1. $server = new swoole_server("127.0.0.1", 9501);
  2. $server->on("workerstart",function ($ser,$workerId){
  3. if($workerId == 0){
  4. swoole_timer_tick(1000,function (){
  5. });
  6. }
  7. });
  8. $server->on("connect", function ($server, $fd){
  9. echo "connection open: {$fd}
  10. ";
  11. });
  12. $server->on("receive", function ($server, $fd, $reactor_id, $data) {
  13. $server->send($fd, "Swoole: {$data}");
  14. $server->close($fd);
  15. });
  16. $server->on("close", function ($server, $fd) {
  17. echo "connection close: {$fd}
  18. ";
  19. });
  20. $server->start();

我们就可以很快的创建出一个多进程的协程TCP服务器,而且在各个回调函数内,均自动创建协程环境,我们可以在协程回调内,去调用协程的数据库API,这样就避免了因为阻塞数据库调用而导致无法处理其他客户端请求的问题。然而尽管如此,很多人可能都没有思考过,如何优雅的写出自己的物联网服务器。举个例子,我们常见的互联网设备管理服务中,大约可能出现如下代码:

</>复制代码

  1. swoole_timer_tick(5000,function (){
  2. $deviceList = $db->getAll();
  3. foreach ($deviceList as $device){
  4. //do your check
  5. /*
  6. * 例如设备状态处于1,那么需要处理流程1
  7. * 例如设备状态处于2,那么需要处理流程2
  8. * 例如设备状态处于3,那么需要处理流程3
  9. */
  10. }
  11. });

</>复制代码

  1. 定时遍历检查设备状态以及广播
    这样乍一看好像无伤大雅,但是当出现多种设备,且每种设备逻辑都不一致的时候,那么这样的编写模式就很容易写出一大坨代码出来,而且在协程下,如果不注意变量访问安全与协程上下文隔离,那么就很容易出现bug,导致很难维护。
Actor模型

什么是Actor,简单来说,Actor就是一种高度抽象化的并发模型,每个Actor实例的内存空间都是互相隔离的,用于降低用户编程与维护难度。关于Swoole4.x如何实现协程版本的Actor,我们之前已经在文章 https://segmentfault.com/a/11... 中讲解了如何用Swoole实现协程的原理。

Actor模型库实战

我们依旧用easyswoole/actor库来讲解,例如,我们有一种型号的设备,那么我们可以定义一个设备Actor,并把该设备的全部逻辑,写在该actor模型内,例子代码如下:

</>复制代码

  1. namespace AppDevice;
  2. use EasySwooleActorAbstractActor;
  3. use EasySwooleActorActorConfig;
  4. use EasySwooleEasySwooleLogger;
  5. use EasySwooleEasySwooleServerManager;
  6. use EasySwooleEasySwooleTrigger;
  7. class DeviceActor extends AbstractActor
  8. {
  9. private $fd;
  10. private $deviceId;
  11. private $lastHeartBeat;
  12. public static function configure(ActorConfig $actorConfig)
  13. {
  14. $actorConfig->setActorName("Device");
  15. }
  16. protected function onStart()
  17. {
  18. $this->lastHeartBeat = time();
  19. /*
  20. * 该参数是创建的时候传递的
  21. */
  22. $this->fd = $this->getArg()["fd"];
  23. $this->deviceId = $this->getArg()["deviceId"];
  24. //记录到table manager中
  25. DeviceManager::addDevice(new DeviceBean([
  26. "deviceId"=>$this->deviceId,
  27. "actorId"=>$this->actorId(),
  28. "fd"=>$this->fd
  29. ]));
  30. //推送消息
  31. ServerManager::getInstance()->getSwooleServer()->push($this->fd,"connect to server success,your actorId is {$this->actorId()}");
  32. //创建一个定时器,如果一个设备20s没有收到消息,自动下线
  33. $this->tick(20*2000,function (){
  34. if(time() - $this->lastHeartBeat > 20){
  35. $this->exit(-1);
  36. }
  37. });
  38. }
  39. protected function onMessage($msg)
  40. {
  41. if($msg instanceof Command){
  42. switch ($msg->getCommand()){
  43. case $msg::RECONNECT:{
  44. DeviceManager::updateDeviceInfo($this->deviceId,[
  45. "fd"=>$msg->getArg()
  46. ]);
  47. $this->fd = $msg->getArg();
  48. Logger::getInstance()->console("deviceId {$this->deviceId} at actorId {$this->actorId()} reconnect success");
  49. ServerManager::getInstance()->getSwooleServer()->push($this->fd,"deviceId {$this->deviceId} at actorId {$this->actorId()} reconnect success");
  50. break;
  51. }
  52. case $msg::WS_MSG:{
  53. $recv = $msg->getArg();
  54. Logger::getInstance()->console("deviceId {$this->deviceId} at actorId {$this->actorId()} recv ws msg: {$recv}");
  55. ServerManager::getInstance()->getSwooleServer()->push($this->fd,"actor recv msg for hash ".md5($recv));
  56. break;
  57. }
  58. case $msg::REPLY_MSG:{
  59. $recv = $msg->getArg();
  60. Logger::getInstance()->console("deviceId {$this->deviceId} at actorId {$this->actorId()} recv reply msg: {$recv}");
  61. ServerManager::getInstance()->getSwooleServer()->push($this->fd,"actor recv reply msg ".$recv);
  62. //此处return 一个数据,会返回给客户端
  63. return "actorId {$this->actorId()} recv {$recv}";
  64. break;
  65. }
  66. }
  67. }
  68. }
  69. protected function onExit($arg)
  70. {
  71. if($arg == -1){
  72. if(ServerManager::getInstance()->getSwooleServer()->exist($this->fd)){
  73. ServerManager::getInstance()->getSwooleServer()->push($this->fd,"heartbeat lost,actor exit");
  74. ServerManager::getInstance()->getSwooleServer()->close($this->fd);
  75. }
  76. }
  77. DeviceManager::deleteDevice($this->deviceId);
  78. Logger::getInstance()->console("deviceId {$this->deviceId} at actorId {$this->actorId()} exit");
  79. }
  80. protected function onException(Throwable $throwable)
  81. {
  82. Trigger::getInstance()->throwable($throwable);
  83. }
  84. }

在该Actor内,我们定义了这个设备的生命周期行为。

设备上线,记录设备id与fd信息,并创建心跳周期检查

收到消息,可以对该Actor投递数据,处理对应的消息行为

设备下线,当设备下线,可以自动的清理定时器与其他的一些通知与清理逻辑

我们可以很清楚的看到,Actor模型下,允许我们对一种设备模型进行高度自治的管理。当然,我们本章节主要在讲解如何优雅的利用Swoole协程来实现Actor模型,从而更好的开发管理我们的设备,因此我不再贴过多的代码,有兴趣的同学可以在Easyswoole框架demo中查看完整的示例代码https://github.com/easy-swool...

Easyswoole项目主页:http://easyswoole.com/
Easyswoole github 主仓库https://github.com/easy-swool... ,如果你觉得我们的努力有对你起到帮助作用,记得给个star

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/31668.html

相关文章

  • php极简框架 one 1.6.7发布,新增分布式并发模型Actor

    摘要:新增新增模型方法,主动刷新数据表结构缓存。分布式并发模型是什么是一种与共享内存对应的并发模型,具有资源独占性。都分布在不同的机器上。 One - 极简 . 高性能 . 松耦合 . 分布式 . 可运行于多种环境(cli,apache/php-fpm,swoole) 码云: https://gitee.com/vicself/onegithub: https://github.com/li...

    刘明 评论0 收藏0
  • 联网并发编程网络编程中的线程模型

    摘要:如需了解更多物联网网络编程知识请点击物联网云端开发武器库物联网高并发编程之网络编程中的线程模型值得说明的是,具体选择线程还是进程,更多是与平台及编程语言相关。 如需了解更多物联网网络编程知识请点击:物联网云端开发武器库 物联网高并发编程之网络编程中的线程模型 值得说明的是,具体选择线程还是进程,更多是与平台及编程语言相关。例如 C 语言使用线程和进程都可以(例如 Nginx 使用进程...

    ziwenxie 评论0 收藏0
  • PHP下用Swoole实现Actor并发模型

    摘要:协程与信箱得益于,我们可以基于的协程与快速实现一个信箱模式调度。样例代码比如在一个聊天室中,我们可以定义一个房间模型。 什么是Actor? Actor对于PHPer来说,可能会比较陌生,写过Java的同学会比较熟悉,Java一直都有线程的概念(虽然PHP有Pthread,但不普及),它是一种非共享内存的并发模型,每个Actor内的数据独立存在,Actor之间通过消息传递的形式进行交互调...

    GeekQiaQia 评论0 收藏0
  • Akka系列(六):Actor解决了什么问题?

    摘要:原文链接解决了什么问题使用模型来克服传统面向对象编程模型的局限性,并应对高并发分布式系统所带来的挑战。在某些情况,这个问题可能会变得更糟糕,工作线程发生了错误但是其自身却无法恢复。 这段时间由于忙毕业前前后后的事情,拖更了很久,表示非常抱歉,回归后的第一篇文章主要是看到了Akka最新文档中写的What problems does the actor model solve?,阅读完后觉...

    Carson 评论0 收藏0

发表评论

0条评论

ixlei

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<