摘要:方案介绍该方案出来的场景一天有一个业务需求,需要把我方的一些信息或订单状态等异步发起请求同步给第三方,这里就会出现定时时间和延迟时间消息的处理,考虑过很多消息队列方案如云消息服务等。
方案介绍
方案架构图 mysql的任务队列表该方案出来的场景:一天有一个业务需求,需要把我方的一些信息或订单状态等异步发起请求同步给第三方,这里就会出现定时时间和延迟时间消息的处理,考虑过很多消息队列方案(如:rabbitmq、云消息服务等)。
不过最后公司定了因为该业务流量很小,不用做那么麻烦。所以就直接出了这个方案
该方案在50条消息/s,应该压力不大,量大了就会出现一个消息延迟问题,如果不注重这个业务时间准确性,该方案承载的秒级处理在1000内应该问题也不大
CREATE TABLE `open_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `type` tinyint(4) unsigned NOT NULL DEFAULT "0" COMMENT "任务类型,可用于后续读取不同任务失败次数区分", `order_id` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "业务绑定的订单id等,根据自己自行设计业务id", `event_identity` varchar(30) NOT NULL DEFAULT "" COMMENT "事件表示分类,可不用,同type", `data` text NOT NULL COMMENT "需要处理的数据,json格式化", `try` tinyint(4) unsigned NOT NULL DEFAULT "1" COMMENT "特定任务需要当时就处理的次数,而不是发起回调请求的任务", `again` tinyint(1) unsigned NOT NULL DEFAULT "0" COMMENT "0不是重试记录 1重试记录,失败后发起任务未1,否者未0", `status` tinyint(1) unsigned NOT NULL DEFAULT "0" COMMENT "是否被执行 1是 0否", `create_time` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "任务创建的时间戳", `at_time` int(11) unsigned NOT NULL DEFAULT "0" COMMENT "任务时间的时间戳", `task_status` tinyint(1) NOT NULL DEFAULT "0" COMMENT "执行结果 -1重复消息系统主动取消 0未执行 1执行成功 2执行失败", `task_time` int(10) unsigned NOT NULL DEFAULT "0" COMMENT "执行时间", PRIMARY KEY (`id`), KEY `IDX_EVENT` (`event_identity`), KEY `IDX_AT_TIME` (`at_time`) USING BTREE, KEY `IDX_ORDER` (`order_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;方案实现源码
redis同步到mysql下方代码都是伪代码,大家自行根据思路自己设计业务
该步骤可以省略,因为我方当时redis是单机部署,也没做持久化,
$redisTypeLock; //lock类型 if (!IS_WIN) { $file = fopen($path . "/lock/{$redisTypeLock}.lock","w+"); if (!flock($file, LOCK_EX | LOCK_NB)) { flock($file,LOCK_UN); fclose($file); exit; } } //成功获得锁 开始业务执行 $count = get_redis_lLen("msgevent:" . $redisType); if (!$count) { //无需要入数据库订单队列,直接返回 return false; } $successCount = 0; for ($i = 0; $i < $loopLen; $i++) { $data = get_redis_lPop("msgevent:" . $redisType); //把redis数据格式化存入到数据库持久化 $result = $this->saveToMysql($data); if (!$result) { //如果执行失败,重新推入redis队列 get_redis_lPush("msgevent:" . $redisType, $data); } $successCount++; } if (!IS_WIN) { flock($file, LOCK_UN); fclose($file); }定时执行任务的的入口
这个入口是定时秒级处理脚本和处理小于当前时间的脚本调用的入口
public function task_run() { $timeType = $_GET["tt"] ? : "now"; //获取当前秒需要处理的所有消息, 依次路由后执行 //如果你redis无需存入mysql,你可以redis的list结构实现, 一次全部取出数据集合,并删除list //如果接下去业务里发现发起请求失败了,重新把任务分配给redis建立恢复list,list名为t+需要什么时间执行的时间戳 $lists = $this->getTaskLists($timeType); foreach ($lists as $item) { $this->task("event:" . $item["event_type"]); //$this->task("event:demo1"); //$this->task("event:demo2"); //$this->task("event:demo3"); } }实际处理业务(发起通知请求)
# 模拟一个发起的请求事件demo private function pushDemo1Event($info) { $sendInfoData = $this->getSendInfoFromInfo($info); //发起请求,这里做的真正的发起给对方的请求,你可以根据$sendInfo拼接各种事件或消息分类给对方 $isSuccess = $this->sendInfo($sendInfoData); $data = array( "status" => 1, "task_status" => $isSuccess ? 1 : 2, "task_time" => NOW_TIME ); $db->save($data); //如果执行失败,重新插入一条记录,并把at_time生成下次执行的时间戳,这样定时器根据at_time字段可以取出判断执行到当时的秒级 if (!$isSuccess) { $tryCount = $db->where(...)->count(); if ($tryCount >= 5) { return false; } $this->newTask($info, $tryCount, 30); } } private function newTask($info, $tryCount, $timeout = 30) { $newTime = $this->nextTaskTime($info, $timeout); $data = array( ... "at_time" => newTime //执行的时间(时间戳) ); $db->add($data); }定时消费方
$timeType = $_GET["time_type"] ? : "now"; $lists = $this->getMsgAll($timeType); if (!$lists) { return false; } foreach ($lists as $item) { if ($item["type"] == "1") { //根据业务生成我方真实订单 call_user_func_array(array($this, "add_real_order"), array($item)); } if (in_array($item["type"], array(2,3))) { $this->push("push:toengineer", $item); //指派或取消工程师 } if ($item["type"] == "5") { $this->push("push:edit", $item); //编辑 } if ($item["type"] == "6") { $this->push("push:cancel", $item); //取消 } if ($item["type"] == "7" || $item["type"] == "8" || $item["type"] == "9") { $this->push("push:status", $item); //简单订单状态 } }借助crontab+shell外力实现秒级执行
读取redis任务队列到mysql存储因为crontab是最小单位是分钟,所以需要借助shell脚本来实现秒级执行
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do #执行php直接返回,不会阻塞 不要忘记最后面的 & $(/usr/bin/php index.php redis2mysql > /dev/null 2>&1 &) sleep 1 done exit 0处理秒级时间sh脚本
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do $(/usr/bin/php index.php task_run > /dev/null 2>&1 &) sleep 1 done exit 0处理秒级小于当前时间sh脚本
#!/bin/bash cd /web/project/src for (( i = 1; i < 60; i = i + 1 )) do $(/usr/bin/php index.php task_run/tt/lt > /dev/null 2>&1 &) sleep 1 done exit 0linux下的crontab
* * * * * sh /web/project/src/shell/repair_build_order.sh * * * * * sh /web/project/src/shell/repair_sync_second.sh * * * * * sh /web/project/src/shell/repair_sync_lt_time.sh
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/26001.html
摘要:的毫秒级超时也有问题。。中超时实现一初级最简单的超时实现秒级超时思路很简单链接一个后端,然后设置为非阻塞模式,如果没有连接上就一直循环,判断当前时间和超时时间之间的差异。实际处理这个调用的部件在完成后,通过状态通知和回调来通知调用者。 概述 在PHP开发中工作里非常多使用到超时处理到超时的场合,我说几个场景: 异步获取数据如果某个后端数据源获取不成功则跳过,不影响整个页面展现 为了保...
摘要:在软件项目中,定时器也被应用到了各方各面,本文将从项目入手,讲述定时器,本文的例子都以为例。定时器总类定时器有两种对应重复任务和一次性任务。 在大规模分布式系统中,每个业务都可能是集群,每个业务机都会产生定时任务,不同的业务会有不同的任务管理需求,统一的任务调度和管理变得非常有必要。 定时如何准确,大量的定时被同时触发怎么办? 定时结束的时候,怎么通知业务机去处理呢? 某台业务机下线...
摘要:在软件项目中,定时器也被应用到了各方各面,本文将从项目入手,讲述定时器,本文的例子都以为例。定时器总类定时器有两种对应重复任务和一次性任务。 在大规模分布式系统中,每个业务都可能是集群,每个业务机都会产生定时任务,不同的业务会有不同的任务管理需求,统一的任务调度和管理变得非常有必要。 定时如何准确,大量的定时被同时触发怎么办? 定时结束的时候,怎么通知业务机去处理呢? 某台业务机下线...
摘要:诸如此类,队列的应用范围是如此之广。方案抽象到更高一层,开发一套通用异步处理队列适用于任何复杂的业务逻辑那么,作为架构师,使用队列的做法,将抽象层和业务层分离,可具有良好的扩展性和可维护性。 一、队列使用场景:为什么需要队列 在web开发中,我们经常会遇到需要处理批量任务的时候,这些批量任务可能是用户提交的,也可能是当系统被某个事件触发时需要进行批量处理的,面对这样的任务,如果是用户提...
阅读 3384·2021-11-25 09:43
阅读 3447·2021-11-19 09:40
阅读 2384·2021-10-14 09:48
阅读 1260·2021-09-09 11:39
阅读 1903·2019-08-30 15:54
阅读 2801·2019-08-30 15:44
阅读 1977·2019-08-29 13:12
阅读 1515·2019-08-29 12:59