资讯专栏INFORMATION COLUMN

100行代码实现任务队列

xorpay / 660人阅读

摘要:最近刚看完多线程,为了加深印象,按照分钟实现延迟消息功能的思路,实现了一个简易版的异步队列。读取任务时,计算当前和,取出需要执行的任务,使用多线程的形式执行。加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。

最近刚看完python多线程,为了加深印象,按照1分钟实现“延迟消息”功能的思路,实现了一个简易版的异步队列。

高效延时消息,包含两个重要的数据结构:

1.环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)

2.任务集合,环上每一个slot是一个Set

同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:

(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针

下边是代码(代码不止100行,但是在200行内,也算100行了。)

#! -*- coding: utf-8 -*-

try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import simplejson as json
except ImportError:
    import json

import os
import errno
import Queue
import random
import logging
from functools import wraps
from threading import Timer, RLock, Thread
from time import sleep, time
from base64 import b64encode, b64decode

# json 的数据结构
# tasks = {
#     index: {
#         cycle_num: [(func, bargs)]
#     }
# }

logging.basicConfig(level=logging.DEBUG,
                    format="(%(asctime)-15s) %(message)s",)
tasks_file = "tasks.json"
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY

# 为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量
WORKER_NUMS = 2
q = Queue.Queue(WORKER_NUMS)

lock = RLock()


def check_file():
    try:
        file_handle = os.open(tasks_file, flags)
    except OSError as e:
        if e.errno == errno.EEXIST:  # Failed as the file already exists.
            pass
        else:
            raise
    else:
        with os.fdopen(file_handle, "w") as file_obj:
            file_obj.write("{}")


def set_delay_task(func_name, *args, **kwargs):
    # 使用锁来保证每次只要一个线程写入文件,防止数据出错
    with lock:
        with open(tasks_file, "r+") as json_file:
            count_down = kwargs.pop("count_down", 0)
            tasks = json.load(json_file)
            # 执行时间
            exec_time = int(time()) + count_down
            # 循环索引
            index = str(exec_time % 3600)
            # 圈数
            cycle_num = str(exec_time / 3600 + 1)
            dargs = pickle.dumps((args, kwargs))
            bargs = b64encode(dargs)
            index_data = tasks.get(index, {})
            index_data.setdefault(cycle_num, []).append((func_name, bargs))
            tasks[index] = index_data
            json_file.seek(0)
            json.dump(tasks, json_file)
            logging.debug("Received task: %s" % func_name)


def get_delay_tasks():
    with open(tasks_file, "r+") as json_file:
        tasks = json.load(json_file)
        # 执行时间
        current_time = int(time())
        # 循环索引
        index = str(current_time % 3600)
        # 圈数
        cycle_num = str(current_time / 3600 + 1)
        current_tasks = tasks.get(index, {}).get(cycle_num, [])
    tasks = []
    for func, bargs in current_tasks:
        dargs = b64decode(bargs)
        args, kwargs = pickle.loads(dargs)
        tasks.append((func, (args, kwargs)))
    return tasks


def get_method_by_name(method_name):
    possibles = globals().copy()
    possibles.update(locals())
    method = possibles.get(method_name)
    return method


def create_task(task_class, func, task_name=None, **kwargs):

    def execute(self):
        args, kwargs = self.data or ((), {})
        return func(*args, **kwargs)

    attrs = {
        "execute": execute,
        "func_name": func.__name__,
        "__module__": func.__module__,
        "__doc__": func.__doc__
    }
    attrs.update(kwargs)

    klass = type(
        task_name or func.__name__,
        (task_class,),
        attrs
    )

    return klass


class Hu(object):

    def __init__(self, func_name=None):
        self.func_name = func_name
        check_file()

    def task(self):
        def deco(func):
            self.func_name = func.__name__
            klass = create_task(Hu, func, self.func_name)
            func.delay = klass(func_name=klass.func_name).delay
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            return wrapper
        return deco

    def delay(self, *args, **kwargs):
        _args = [self.func_name]
        _args.extend(args)
        Timer(0, set_delay_task, _args, kwargs).start()
        return True


def boss():
    while True:
        current_tasks = get_delay_tasks()
        for func, params in current_tasks:
            # Task accepted: auth.tasks.send_msg
            logging.debug("Task accepted: %s" % func)
            q.put((func, params))
        sleep(1)


def worker():
    while True:
        func, params = q.get()
        print "get task: %s
" % func
        method = get_method_by_name(func)
        args, kwargs = params
        # Task auth.tasks.send_msgsucceeded in
        start_time = time()
        method(*args, **kwargs)
        end_time = time()
        logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time))
        q.task_done()


def main():
    check_file()
    print("starting at:", time())
    for target in (boss, worker):
        t = Thread(target=target)
        t.start()
    print("all DONE at:", time())

hu = Hu()

# 使用方式如下:

@hu.task()
def test(num):
    sleep(2)
    print "test: %s" % num


if __name__ == "__main__":
    for i in range(10):
        test.delay(i, count_down=random.randint(1, 10))
    main()

# output

(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test

(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2
get task: test

(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796
(2017-03-21 15:59:24,404) Task accepted: test
test: 1
get task: test

按照1分钟实现“延迟消息”功能的思路。队列的数据结构为

{
    index: {
        cycle_num: [(func, bargs)]
    }
}

index的值为 1-3600。每小时一个循环。
cycle_num 则是 由 (时间戳 / 3600 + 1) 计算得到的值,是圈数。

每当有任务加入,我们计算出index和cycle_num 将参数和方法名写入json文件。
读取任务时,计算当前 index和cycle_num, 取出需要执行的任务,使用多线程的形式执行。

为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量。

加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。

当然,也可以使用redis 存储队列,因为 redis 是单线程操作,可以防止多线程操作影响数据一致性的问题。
这一部分有需要的可以自己实现。

参考:

python线程笔记

1分钟实现“延迟消息”功能

>欢迎关注 >请我喝芬达

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38539.html

相关文章

  • Laravel Telescope:优雅的应用调试工具

    摘要:文章转自视频教程优雅的应用调试工具新扩展是由和开源的应用的调试工具。计划任务列出已运行的计划任务。该封闭函数会被序列化为一个长字符串,加上他的哈希与签名如出一辙该功能将记录所有异常,并可查看具体异常情况。事件显示所有事件的列表。 文章转自:https://laravel-china.org/topics/19013视频教程:047. 优雅的应用调试工具--laravel/telesco...

    MasonEast 评论0 收藏0
  • 时间切片(Time Slicing)

    摘要:所以回来后就想着补一篇文章针对时间切片展开详细的讨论。所以时间切片的目的是不阻塞主线程,而实现目的的技术手段是将一个长任务拆分成很多个不超过的小任务分散在宏任务队列中执行。上周我在FDConf的分享《让你的网页更丝滑》中提到了时间切片,由于时间关系当时并没有对时间切片展开更细致的讨论。所以回来后就想着补一篇文章针对时间切片展开详细的讨论。 从用户的输入,再到显示器在视觉上给用户的输出,这一过...

    Freeman 评论0 收藏0
  • 300ABAP代码实现一个最简单的区块链原型

    摘要:我的这篇文章没有任何高大上的术语,就是行代码,实现一个最简单的区块链原型。检查该区块链是否有效。而通过在循环里不断尝试最终得到一个合法的哈希值的这一过程,就是区块链圈内俗称的挖矿。 不知从什么时候起,区块链在网上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 这里Jerry就不班门弄斧了,网上...

    cikenerd 评论0 收藏0
  • 300ABAP代码实现一个最简单的区块链原型

    摘要:我的这篇文章没有任何高大上的术语,就是行代码,实现一个最简单的区块链原型。检查该区块链是否有效。而通过在循环里不断尝试最终得到一个合法的哈希值的这一过程,就是区块链圈内俗称的挖矿。 不知从什么时候起,区块链在网上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 这里Jerry就不班门弄斧了,网上...

    DangoSky 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<