资讯专栏INFORMATION COLUMN

Redis 实现队列

lifesimple / 771人阅读

摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有

场景说明:

用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时

高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求

抢购场景,先入先出的模式

命令:

rpush + blpop 或 lpush + brpop

rpush : 往列表右侧推入数据  
blpop : 客户端阻塞直到队列有值输出
简单队列: simple.php
$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000");
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    $redis->rPush("goods:task", json_encode($row));
}

$redis->close();

获取20000万个商品,并把json化后的数据推入goods:task队列

queueBlpop.php
// 出队
while (true) {
    // 阻塞设置超时时间为3秒
    $task = $redis->blPop(array("goods:task"), 3);
    if ($task) {
        $redis->rPush("goods:success:task", $task[1]);
        $task = json_decode($task[1], true);
        echo $task["id"] . ":" . $task["cid"] . ":" . "handle success";
        echo PHP_EOL;
    } else {
        echo "nothing" . PHP_EOL;
        sleep(5);
    }
}

设置blpop阻塞时间为3秒,当有数据出队时保存到goods:success:task表示执行成功,当队列没有数据时,程序睡眠10秒重新检查goods:task是否有数据出队

cli 模式执行命令:
php simple.php
php queueBlpop.php
优先级队列
思路:

blpop 有多个键时,blpop会从左至右遍历键,一旦一个键能弹出元素,客户端立即返回。例如:

blpop key1 key2 key3 key4

从key1到key4遍历,如果哪个key有值,则弹出这个值,若多个key同时有值时,优先弹出排在左边的key。

priority.php
// 设置优先级队列
$high = "goods:high:task";
$mid = "goods:mid:task";
$low = "goods:low:task";

$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000");
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    // cid 小于100放在低级队列
    if ($row["cid"] < 100) {
        $redis->rPush($low, json_encode($row));
    }
    // cid 100到600之间放在中级队列
    elseif ($row["cid"] > 100 && $row["cid"] < 600) {
        $redis->rPush($mid, json_encode($row));
    }
    // cid 大于600放在高级队列 
    else {
        $redis->rPush($high, json_encode($row));
    }
}
$redis->close();
priorityBlop.php
// 优先级队列
$high = "goods:high:task";
$mid = "goods:mid:task";
$low = "goods:low:task";

// 出队
while(true){
    // 优先级高的队列放在左侧
    $task = $redis->blPop(array($high, $mid, $low), 3);
    if ($task) {
        $task = json_decode($task[1], true);
        echo $task["id"] . ":" . $task["cid"] . ":" . "handle success";
        echo PHP_EOL;
    } else {
        echo "nothing" . PHP_EOL;
        sleep(5);
    }
}

优先级高的队列放在blpop命令左侧,依次排序,blpop命令会依次弹出high, mid, low队列的值

cli 模式执行命令:
php priority.php
php priorityBlpop.php
延迟队列
思路:

可以用一个有序集合来保存延迟任务,member保存任务内容,score保存(当前时间 + 延时时间)。用时间作为score。程序只要用有序集合的第一条任务的score和当前时间做比较,如果当前时间比score小,说明有序集合的所有任务还没到执行时间。

delay.php
$stmt = $pdo->prepare("select id, cid, name from zc_goods limit 200000");
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
    $redis->zAdd("goods:delay:task", time() + rand(1, 300), json_encode($row));
}

将20万条任务导入有序集合goods:delay:task,所有任务延迟到之后的1秒到300秒内执行

delayHandle.php
while (true) {
    // 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间
    // 相对说明集合的其他任务未到执行时间
    $rs = $redis->zRange("goods:delay:task", 0, 0, true);
    // 集合没有任务,睡眠时间设置为5秒
    if (empty($rs)) {
        echo "no tasks , sleep 5 seconds" . PHP_EOL;
        sleep(5);
        continue;
    }

    $taskJson = key($rs);
    $delay = $rs[$taskJson];
    $task = json_decode($taskJson, true);
    $now = time();

    // 到时间执行延时任务
    if ($delay <= $now) {
        // 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改
        if (!($identifier = acquireLock($task["id"]))) {
            continue;
        }

        // 移动延时任务到任务队列
        $redis->zRem("goods:delay:task", $taskJson);
        $redis->rPush("goods:task", $taskJson);
        echo $task["id"] . " run " . PHP_EOL;

        // 释放锁
        releaseLock($task["id"], $identifier);
    } else {
        // 延时任务未到执行时间
        $sleep = $delay - $now;
        // 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理
//        $sleep = $sleep > 2 ? 2 :$sleep;
        echo "wait " . $sleep . " seconds " . PHP_EOL;
        sleep($sleep);
    }
}

这个文件对有序集合内的延迟任务做处理,如果延迟任务到了执行时间,则把延迟任务移动到任务队列中

queueBlpop.php
// 出队
while (true) {
    // 阻塞设置超时时间为3秒
    $task = $redis->blPop(array("goods:task"), 3);
    if ($task) {
        $redis->rPush("goods:success:task", $task[1]);
        $task = json_decode($task[1], true);
        echo $task["id"] . ":" . $task["cid"] . ":" . "handle success";
        echo PHP_EOL;
    } else {
        echo "nothing" . PHP_EOL;
        sleep(5);
    }
}

处理任务队列中的任务

cli模式下执行命令:
php delay.php
php delayHanlde.php
php queueBlpop.php

完整代码:https://github.com/wuzhc/demo...
相关项目: https://github.com/wuzhc/gmq

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70394.html

相关文章

  • laravel 队列

    摘要:如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。这两个进程操纵了三个队列,其中一个,负责即时任务,两个,负责延时任务与待处理任务。如果任务执行成功,就会删除中的任务,否则会被重新放入队列中。 在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试...

    BDEEFE 评论0 收藏0
  • laravel/lumen 使用 redis队列

    摘要:配置项用于配置失败队列任务存放的数据库及数据表。要使用队列驱动,需要在配置文件中配置数据库连接。如果应用使用了,那么可以使用时间或并发来控制队列任务。你可以使用命令运行这个队列进程。如果队列进程意外关闭,它会自动重启启动队列进程。 一、概述 在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系...

    mengbo 评论0 收藏0

发表评论

0条评论

lifesimple

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<