摘要:最近刚看完多线程,为了加深印象,按照分钟实现延迟消息功能的思路,实现了一个简易版的异步队列。读取任务时,计算当前和,取出需要执行的任务,使用多线程的形式执行。加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。
最近刚看完python多线程,为了加深印象,按照1分钟实现“延迟消息”功能的思路,实现了一个简易版的异步队列。
高效延时消息,包含两个重要的数据结构:
1.环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
2.任务集合,环上每一个slot是一个Set
同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。
Task结构中有两个很重要的属性:
(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针
下边是代码(代码不止100行,但是在200行内,也算100行了。)
#! -*- coding: utf-8 -*- try: import cPickle as pickle except ImportError: import pickle try: import simplejson as json except ImportError: import json import os import errno import Queue import random import logging from functools import wraps from threading import Timer, RLock, Thread from time import sleep, time from base64 import b64encode, b64decode # json 的数据结构 # tasks = { # index: { # cycle_num: [(func, bargs)] # } # } logging.basicConfig(level=logging.DEBUG, format="(%(asctime)-15s) %(message)s",) tasks_file = "tasks.json" flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY # 为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量 WORKER_NUMS = 2 q = Queue.Queue(WORKER_NUMS) lock = RLock() def check_file(): try: file_handle = os.open(tasks_file, flags) except OSError as e: if e.errno == errno.EEXIST: # Failed as the file already exists. pass else: raise else: with os.fdopen(file_handle, "w") as file_obj: file_obj.write("{}") def set_delay_task(func_name, *args, **kwargs): # 使用锁来保证每次只要一个线程写入文件,防止数据出错 with lock: with open(tasks_file, "r+") as json_file: count_down = kwargs.pop("count_down", 0) tasks = json.load(json_file) # 执行时间 exec_time = int(time()) + count_down # 循环索引 index = str(exec_time % 3600) # 圈数 cycle_num = str(exec_time / 3600 + 1) dargs = pickle.dumps((args, kwargs)) bargs = b64encode(dargs) index_data = tasks.get(index, {}) index_data.setdefault(cycle_num, []).append((func_name, bargs)) tasks[index] = index_data json_file.seek(0) json.dump(tasks, json_file) logging.debug("Received task: %s" % func_name) def get_delay_tasks(): with open(tasks_file, "r+") as json_file: tasks = json.load(json_file) # 执行时间 current_time = int(time()) # 循环索引 index = str(current_time % 3600) # 圈数 cycle_num = str(current_time / 3600 + 1) current_tasks = tasks.get(index, {}).get(cycle_num, []) tasks = [] for func, bargs in current_tasks: dargs = b64decode(bargs) args, kwargs = pickle.loads(dargs) tasks.append((func, (args, kwargs))) return tasks def get_method_by_name(method_name): possibles = globals().copy() possibles.update(locals()) method = possibles.get(method_name) return method def create_task(task_class, func, task_name=None, **kwargs): def execute(self): args, kwargs = self.data or ((), {}) return func(*args, **kwargs) attrs = { "execute": execute, "func_name": func.__name__, "__module__": func.__module__, "__doc__": func.__doc__ } attrs.update(kwargs) klass = type( task_name or func.__name__, (task_class,), attrs ) return klass class Hu(object): def __init__(self, func_name=None): self.func_name = func_name check_file() def task(self): def deco(func): self.func_name = func.__name__ klass = create_task(Hu, func, self.func_name) func.delay = klass(func_name=klass.func_name).delay @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return deco def delay(self, *args, **kwargs): _args = [self.func_name] _args.extend(args) Timer(0, set_delay_task, _args, kwargs).start() return True def boss(): while True: current_tasks = get_delay_tasks() for func, params in current_tasks: # Task accepted: auth.tasks.send_msg logging.debug("Task accepted: %s" % func) q.put((func, params)) sleep(1) def worker(): while True: func, params = q.get() print "get task: %s " % func method = get_method_by_name(func) args, kwargs = params # Task auth.tasks.send_msgsucceeded in start_time = time() method(*args, **kwargs) end_time = time() logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time)) q.task_done() def main(): check_file() print("starting at:", time()) for target in (boss, worker): t = Thread(target=target) t.start() print("all DONE at:", time()) hu = Hu() # 使用方式如下: @hu.task() def test(num): sleep(2) print "test: %s" % num if __name__ == "__main__": for i in range(10): test.delay(i, count_down=random.randint(1, 10)) main() # output (2017-03-21 15:59:20,394) Received task: test (2017-03-21 15:59:20,396) Received task: test (2017-03-21 15:59:20,397) Received task: test (2017-03-21 15:59:20,398) Received task: test (2017-03-21 15:59:20,400) Received task: test (2017-03-21 15:59:20,401) Received task: test (2017-03-21 15:59:20,403) Received task: test (2017-03-21 15:59:20,404) Received task: test (2017-03-21 15:59:20,406) Received task: test (2017-03-21 15:59:20,408) Received task: test get task: test (2017-03-21 15:59:21,395) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test test: 2 get task: test (2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796 (2017-03-21 15:59:24,404) Task accepted: test test: 1 get task: test
按照1分钟实现“延迟消息”功能的思路。队列的数据结构为
{ index: { cycle_num: [(func, bargs)] } }
index的值为 1-3600。每小时一个循环。
cycle_num 则是 由 (时间戳 / 3600 + 1) 计算得到的值,是圈数。
每当有任务加入,我们计算出index和cycle_num 将参数和方法名写入json文件。
读取任务时,计算当前 index和cycle_num, 取出需要执行的任务,使用多线程的形式执行。
为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量。
加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。
当然,也可以使用redis 存储队列,因为 redis 是单线程操作,可以防止多线程操作影响数据一致性的问题。
这一部分有需要的可以自己实现。
参考:
python线程笔记
1分钟实现“延迟消息”功能
>欢迎关注 | >请我喝芬达 |
---|---|
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/38539.html
摘要:文章转自视频教程优雅的应用调试工具新扩展是由和开源的应用的调试工具。计划任务列出已运行的计划任务。该封闭函数会被序列化为一个长字符串,加上他的哈希与签名如出一辙该功能将记录所有异常,并可查看具体异常情况。事件显示所有事件的列表。 文章转自:https://laravel-china.org/topics/19013视频教程:047. 优雅的应用调试工具--laravel/telesco...
摘要:所以回来后就想着补一篇文章针对时间切片展开详细的讨论。所以时间切片的目的是不阻塞主线程,而实现目的的技术手段是将一个长任务拆分成很多个不超过的小任务分散在宏任务队列中执行。上周我在FDConf的分享《让你的网页更丝滑》中提到了时间切片,由于时间关系当时并没有对时间切片展开更细致的讨论。所以回来后就想着补一篇文章针对时间切片展开详细的讨论。 从用户的输入,再到显示器在视觉上给用户的输出,这一过...
摘要:我的这篇文章没有任何高大上的术语,就是行代码,实现一个最简单的区块链原型。检查该区块链是否有效。而通过在循环里不断尝试最终得到一个合法的哈希值的这一过程,就是区块链圈内俗称的挖矿。 不知从什么时候起,区块链在网上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 这里Jerry就不班门弄斧了,网上...
摘要:我的这篇文章没有任何高大上的术语,就是行代码,实现一个最简单的区块链原型。检查该区块链是否有效。而通过在循环里不断尝试最终得到一个合法的哈希值的这一过程,就是区块链圈内俗称的挖矿。 不知从什么时候起,区块链在网上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 这里Jerry就不班门弄斧了,网上...
阅读 703·2021-11-18 10:02
阅读 2237·2021-11-15 18:13
阅读 3144·2021-11-15 11:38
阅读 2938·2021-09-22 15:55
阅读 3670·2021-08-09 13:43
阅读 2441·2021-07-25 14:19
阅读 2451·2019-08-30 14:15
阅读 3444·2019-08-30 14:15