资讯专栏INFORMATION COLUMN

Redis实现任务队列

chanjarster / 2264人阅读

摘要:延迟任务使用列表结构可以实现只能执行一种任务的队列,也可以实现通过调用不同回调函数来执行不同任务的队列,甚至还可以实现简单的优先级队列。

在处理Web客户端发送的命令请求时,某些操作的执行时间可能会比我们预期的更长一些。通过将待执行任务的相关信息放入队列里面,并在之后对队列进行处理,用户可以推迟那些需要一段时间才能完成的操作,这种工作交给任务处理器来执行的做法被称为任务队列(task queue)。现在有很多专门的任务队列软件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下来实现两种不同类型的任务队列,第一种队列会根据任务被插入队列的顺序来尽快地执行任务,第二种队列具有安排任务在未来某个特定时间执行的能力。

先进先出队列

除了任务队列之外,还有先进先出(FIFO)队列、后进后出(LIFO)队列和优先级(priority)队列。
使用任务队列来记录邮件的收信人以及发送邮件的原因,并构建一个可以在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工作进程(worker process)。

要编写的队列将以“先到先服务”(first-come,first-served)的方式发送邮件,并且无论发送是否成功,程序都会把发送结果记录到日志里面。Redis的列表结构允许用户通过RPUSH和LPUSH以及RPOP和LPOP,从列表的两端推入和弹出元素。邮件队列使用RPUSH命令来将待发送的邮件推入列表的右端,并且因为工作进程除了发送邮件之外不需要执行其他工作,所以它将使用阻塞版本的弹出命令BLPOP从队列中弹出待发送的邮件,而命令的最大阻塞时限为30秒。

邮件队列由一个Redis列表构成,包含多个JSON编码对象。为了将待发送的邮件推入队列里面,程序会获取发送邮件所需的全部信息,并将这些信息序列化为JSON对象,最后使用RPUSH命令将JSON对象推入邮件队列里面。

def send_sold_email_via_queue(conn, seller, item, price, buyer):
data = {
    "seller_id": seller,
    "item_id": item,
    "price": price,
    "buyer_id": buyer,
    "time": time.time()
}
conn.rpush("queue:email", json.dumps(data))

从队列里获取待发送邮件,程序首先使用BLPOP命令从邮件队列里面弹出一个JSON对象,接着通过解码JSON对象来取得待发送邮件的相关信息,最后根据这些信息来发送邮件。

def process_sold_email_queue(conn):
    while not QUIT:
    packed = conn.blpop(["queue:email"], 30) //获取一封待发送邮件
    
    if not packed: //队列里面暂时还没有待发送邮件,重试
        continue
        
    to_send = json.loads(packed[1]) //从JSON对象中解码出邮件信息
    try:
        fetch_data_and_send_sold_email(to_send)
    except EmailSendError as err:
        log_error("Failed to send sold email", err, to_send)
    else:
        log_success("Send sold email", to_send)            
多个可执行任务

因为BLPOP命令每次只会从队列里面弹出一封待发送邮件,所以待发送邮件不会出现重复,也不会被重复发送。并且因为队列只会存放待发送邮件,所以工作进程要处理的任务是非常单一的。下面代码的工作进程会监视用户提供的多个队列,并从多个已知的已注册回调函数里面,选出一个函数来处理JSON编码的函数调用。

def worker_watch_queue(conn, queue, callback):
    while not QUIT:
        packed = conn.blpop([queue], 30)
        if not packed:
            continue
            
        name, args = json.loads(packed[1])
        if name not in callbacks: //没有找到任务指定的回调函数,用日志记录错误并重试
            log_error("Unknown callback %s"%name)
            continue
        callbacks[name](*args) //执行任务
任务优先级

在使用队列的时候,程序可能会需要让特定的操作优先于其他操作执行。

假设现在我们需要为任务设置高、中、低3种优先级别,其中:高优先级任务在出现之后会第一时间被执行,而中等优先级任务则会在没有任何高优先级任务存在的情况下被执行,而低优先级任务则会在既没有任何高优先级任务,又没有任何中等优先级任务的情况下被执行。

def worker_watch_queues(conn, queues, callbacks):
    while not QUIT:
        packed = conn.blpop(queues, 30)
        if not packed:
            continue
        
        name, args = json.loads(packed[1])
        if name not in callbacks:
            log_error("Unknown callback %s"%name)
            continue
        callbacks[name](*args)

同时使用多个队列可以降低实现优先级特性的难度。除此之外,多队列有时候也会被用于分隔不同的任务(如同一个队列存放公告邮件,而另一个队列则存放提醒邮件),在这种情况下,处理不同队列时可能出现不公平现象。为此,我们可以偶尔重新排列各个队列的顺序,使得针对队列的处理操作变得更公平一些,当某个队列的增长速度比其他队列的增长速度快的时候,这种重拍操作尤为重要。

延迟任务

使用列表结构可以实现只能执行一种任务的队列,也可以实现通过调用不同回调函数来执行不同任务的队列,甚至还可以实现简单的优先级队列。
以下3种方法可以为队列中的任务添加延迟性质:

在任务信息中包含任务的执行时间,如果工作进程发现任务的执行时间尚未来临,那么它将在短暂等待之后,把任务重新推入队列里面。

工作进程使用一个本地的等待列表来记录所有需要在未来执行的任务,并在每次进行while循环的时候,检查等待列表并执行那些已经到期的任务。

把所有需要在未来执行的任务都添加到有序集合里面,并将任务的执行时间设置为分值,另外再使用一个进程来查找有序集合里面是否存在可以立即被执行的任务,如果有的话,就从有序集合里面移除那个任务,并将它添加到适当得任务队列里面。

因为无论是进行短暂的等待,还是将任务重新推入队列里面,都会浪费工作进程的时间,所以我们不会采用第一种方法。此外,因为工作进程可能会因为崩溃而丢失本地记录的所有待执行任务,所以我们也不会采用第二种方法。最后,因为使用有序集合的第三种方法最简单直接,所以我们将采取这一方法,并使用锁来保证任务从有序集合移动到任务队列时的安全性。

有序集合队列(ZSET queue)存储的每个被延迟的任务都是一个包含4个值的JSON列表,这4个分值分别是:唯一标识符、处理任务队列的名字、处理任务的回调函数的名字、传给回调函数的参数。在有序集合里面,任务的分值会被设置为任务的执行时间,而立即可执行的任务将被直接插入任务队列里面。下面代码展示了创建延迟任务(任务是否延迟是可选的,只要把任务的延迟时间设置为0就可以创建一个立即执行的任务)。

def execute_later(conn, queue, name, args, delay=0):
    identifier = str(uuid.uuid4())
    item = json.dumps([identifier, queue, name, args])
    if delay > 0:
        conn.zadd("delayed:", item, time.time() + delay)
    else:
        conn.rpush("queue:" + queue, item)
    return identifier

因为Redis没有提供直接的方法可以阻塞有序集合直到元素的分值低于当前UNIX时间戳为止,所以我们需要自己来查找有序集合里面分值低于当前UNIX时间戳的任务。因为所有被延迟的任务都存储在同一个有序集合队列里面,所以程序只需要获取有序集合里面排名第一的元素以及该元素的分值就可以了:如果队列里面没有任何任务,或者任务的执行时间尚未来临,那么程序将在短暂等待之后重试;如果任务的执行时间已到,那么程序将根据任务包含的标识符来获取一个细粒度锁,接着从有序集合里面移除要被执行的任务,并将它添加到适当的任务队列里面。通过将可执行的任务添加到任务队列里面而不是直接执行它们,我们可以把获取可执行任务的进程数量限制在一两个之内,而不必根据工作进程的数量来决定运行多少个获取进程,这减少了获取可执行任务所需的花销。

def poll_queue(conn):
    while not QUIT:
        item = conn.zrange("delayed:", 0, 0, withscores=True)
        if not item or item[0][1] > time.time():
            time.sleep(.01)
            continue
        
        item = item[0][0]
        identifier, queue, function, args = json.loads(item)
        
        locked = acquire_lock(conn, identifier)
        if not locked:
            continue
        
        if conn.zrem("delayed:", item):
            conn.rpush("queue:" + queue, item)
        
        release_lock(conn, identifier, locked)

因为有序集合并不具备像列表那样的阻塞弹出机制,所以程序需要不断地进行循环,并尝试从队列里面获取要被执行的任务,虽然这一操作会增大网络和处理器的负载,但因为我们只会运行一两个这样的程序,所以不会消耗太多资源。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/44607.html

相关文章

  • laravel 队列

    摘要:如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。这两个进程操纵了三个队列,其中一个,负责即时任务,两个,负责延时任务与待处理任务。如果任务执行成功,就会删除中的任务,否则会被重新放入队列中。 在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试...

    BDEEFE 评论0 收藏0
  • laravel/lumen 使用 redis队列

    摘要:配置项用于配置失败队列任务存放的数据库及数据表。要使用队列驱动,需要在配置文件中配置数据库连接。如果应用使用了,那么可以使用时间或并发来控制队列任务。你可以使用命令运行这个队列进程。如果队列进程意外关闭,它会自动重启启动队列进程。 一、概述 在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系...

    mengbo 评论0 收藏0
  • Redis 实现队列

    摘要:场景说明用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求抢购场景,先入先出的模式命令或往列表右侧推入数据客户端阻塞直到队列有 场景说明: 用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 高并发场景,当某个时刻请求瞬间增加时,可以把请...

    PascalXie 评论0 收藏0

发表评论

0条评论

chanjarster

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<