摘要:因为它是线程安全的,所以多个线程很轻松地使用同一个实例。后进先出队列使用后进先出顺序,与栈结构相似这就是全部代码了,这正是设计很棒的一个原因,它将底层的数据操作抽象成四个操作函数,本身来处理线程安全的问题,使得其子类只需关注底层的操作。
起步
queue 模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。
源码分析先从初始化的函数来看:
class Queue: def __init__(self, maxsize=0): # 设置队列的最大容量 self.maxsize = maxsize self._init(maxsize) # 线程锁,互斥变量 self.mutex = threading.Lock() # 由锁衍生出三个条件变量 self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): # 初始化底层数据结构 self.queue = deque()
从这初始化函数能得到哪些信息呢?首先,队列是可以设置其容量大小的,并且具体的底层存放元素的它使用了 collections.deque() 双端列表的数据结构,这使得能很方便的做先进先出操作。这里还特地抽象为 _init 函数是为了方便其子类进行覆盖,允许子类使用其他结构来存放元素(比如优先队列使用了 list)。
然后就是线程锁 self.mutex ,对于底层数据结构 self.queue 的操作都要先获得这把锁;再往下是三个条件变量,这三个 Condition 都以 self.mutex 作为参数,也就是说它们共用一把锁;从这可以知道诸如 with self.mutex 与 with self.not_empty 等都是互斥的。
基于这些锁而做的一些简单的操作:
class Queue: ... def qsize(self): # 返回队列中的元素数 with self.mutex: return self._qsize() def empty(self): # 队列是否为空 with self.mutex: return not self._qsize() def full(self): # 队列是否已满 with self.mutex: return 0 < self.maxsize <= self._qsize() def _qsize(self): return len(self.queue)
这个代码片段挺好理解的,无需分析。
作为队列,主要得完成入队与出队的操作,首先是入队:
class Queue: ... def put(self, item, block=True, timeout=None): with self.not_full: # 获取条件变量not_full if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full # 如果 block 是 False,并且队列已满,那么抛出 Full 异常 elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() # 阻塞直到由剩余空间 elif timeout < 0: # 不合格的参数值,抛出ValueError raise ValueError(""timeout" must be a non-negative number") else: endtime = time() + timeout # 计算等待的结束时间 while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full # 等待期间一直没空间,抛出 Full 异常 self.not_full.wait(remaining) self._put(item) # 往底层数据结构中加入一个元素 self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): self.queue.append(item)
尽管只有二十几行的代码,但这里的逻辑还是比较复杂的。它要处理超时与队列剩余空间不足的情况,具体几种情况如下:
如果 block 是 False,忽略timeout参数
若此时队列已满,则抛出 Full 异常;
若此时队列未满,则立即把元素保存到底层数据结构中;
如果 block 是 True
若 timeout 是 None 时,那么put操作可能会阻塞,直到队列中有空闲的空间(默认);
若 timeout 是非负数,则会阻塞相应时间直到队列中有剩余空间,在这个期间,如果队列中一直没有空间,抛出 Full 异常;
处理好参数逻辑后,,将元素保存到底层数据结构中,并递增unfinished_tasks,同时通知 not_empty ,唤醒在其中等待数据的线程。
出队操作:
class Queue: ... def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError(""timeout" must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def _get(self): return self.queue.popleft()
get() 操作是 put() 相反的操作,代码块也及其相似,get() 是从队列中移除最先插入的元素并将其返回。
如果 block 是 False,忽略timeout参数
若此时队列没有元素,则抛出 Empty 异常;
若此时队列由元素,则立即把元素保存到底层数据结构中;
如果 block 是 True
若 timeout 是 None 时,那么get操作可能会阻塞,直到队列中有元素(默认);
若 timeout 是非负数,则会阻塞相应时间直到队列中有元素,在这个期间,如果队列中一直没有元素,则抛出 Empty 异常;
最后,通过 self.queue.popleft() 将最早放入队列的元素移除,并通知 not_full ,唤醒在其中等待数据的线程。
这里有个值得注意的地方,在 put() 操作中递增了 self.unfinished_tasks ,而 get() 中却没有递减,这是为什么?
这其实是为了留给用户一个消费元素的时间,get() 仅仅是获取元素,并不代表消费者线程处理的该元素,用户需要调用 task_done() 来通知队列该任务处理完成了:
class Queue: ... def task_done(self): with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: # 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常 raise ValueError("task_done() called too many times") self.all_tasks_done.notify_all() # 当unfinished为0时,会通知all_tasks_done self.unfinished_tasks = unfinished def join(self): with self.all_tasks_done: while self.unfinished_tasks: # 如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait()
由于 task_done() 使用方调用的,当 task_done() 次数大于 put() 次数时会抛出异常。
task_done() 操作的作用是唤醒正在阻塞的 join() 操作。join() 方法会一直阻塞,直到队列中所有的元素都被取出,并被处理了(和线程的join方法类似)。也就是说 join() 方法必须配合 task_done() 来使用才行。
LIFO 后进先出队列LifoQueue使用后进先出顺序,与栈结构相似:
class LifoQueue(Queue): """Variant of Queue that retrieves most recently added entries first.""" def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
这就是 LifoQueue 全部代码了,这正是 Queue 设计很棒的一个原因,它将底层的数据操作抽象成四个操作函数,本身来处理线程安全的问题,使得其子类只需关注底层的操作。
LifoQueue 底层数据结构改用 list 来存放,通过 self.queue.pop() 就能将 list 中最后一个元素移除,无需重置索引。
PriorityQueue 优先队列from heapq import heappush, heappop class PriorityQueue(Queue): """Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). """ def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue)
优先队列使用了 heapq 模块的结构,也就是最小堆的结构。优先队列更为常用,队列中项目的处理顺序需要基于这些项目的特征,一个简单的例子:
import queue class A: def __init__(self, priority, value): self.priority = priority self.value = value def __lt__(self, other): return self.priority < other.priority q = queue.PriorityQueue() q.put(A(1, "a")) q.put(A(0, "b")) q.put(A(1, "c")) print(q.get().value) # "b"
使用优先队列的时候,需要定义 __lt__ 魔术方法,来定义它们之间如何比较大小。若元素的 priority 相同,依然使用先进先出的顺序。
参考https://pymotw.com/3/queue/in...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/43547.html
摘要:介绍今天花了近乎一天的时间研究关于多线程的问题,查看了大量源码自己也实践了一个生产消费者模型,所以把一天的收获总结一下。提供了两个模块和来支持的多线程操作。使用来阻塞线程。 介绍 今天花了近乎一天的时间研究python关于多线程的问题,查看了大量源码 自己也实践了一个生产消费者模型,所以把一天的收获总结一下。 由于GIL(Global Interpreter Lock)锁的关系,纯的p...
摘要:消息队列的接受消息队列的接受是利用函数,其中是消息的类型,该参数会取出指定类型的消息,如果设定的是争抢模式,该值会统一为,否则该值就是消息发送目的的。环形队列的消息入队发送消息首先要确定环形队列的队尾。取模操作可以优化 前言 swoole 的底层队列有两种:进程间通信 IPC 的消息队列 swMsgQueue,与环形队列 swRingQueue。IPC 的消息队列用于 task_wor...
摘要:对线程池的研究是之前对分析的附加工作。在之前对源码分析的文章中,写到调度器将任务放入线程池的函数这里分析的线程池类是,也就是上述代码中所使用的类。 对Python线程池的研究是之前对Apshceduler分析的附加工作。 在之前对Apshceduler源码分析的文章中,写到调度器将任务放入线程池的函数 def _do_submit_job(self, job, run_time...
摘要:默认值为,指定为时代表可以阻塞,若同时指定,在超时时返回。当消费者线程调用意味着有消费者取得任务并完成任务,未完成的任务数就会减少。当未完成的任务数降到,解除阻塞。 学习契机 最近的一个项目中在使用grpc时遇到一个问题,由于client端可多达200,每个端口每10s向grpc server发送一次请求,server端接受client的请求后根据request信息更新数据库,再将数据...
摘要:最近稿定设计这个站点挺火,设计组的大哥一直在提,啊,这个好,这个好。目的是给设计组大哥提供素材参考,毕竟做设计的可不能抄袭哦思路枯竭的时候,借鉴一下还凑合。看了一眼设计大哥的头发,我觉得够他用一年了。 ...
阅读 1441·2021-11-24 09:39
阅读 1749·2021-11-22 15:25
阅读 3665·2021-11-19 09:40
阅读 3250·2021-09-22 15:31
阅读 1262·2021-07-29 13:49
阅读 1173·2019-08-26 11:59
阅读 1288·2019-08-26 11:39
阅读 904·2019-08-26 11:00