摘要:对线程池的研究是之前对分析的附加工作。在之前对源码分析的文章中,写到调度器将任务放入线程池的函数这里分析的线程池类是,也就是上述代码中所使用的类。
对Python线程池的研究是之前对Apshceduler分析的附加工作。
在之前对Apshceduler源码分析的文章中,写到调度器将任务放入线程池的函数
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, "exception_info") else (f.exception(), getattr(f.exception(), "__traceback__", None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
这里分析的线程池类是concurrent.futures.ThreadPoolExecutor,也就是上述代码中self._pool所使用的类。先上self._pool.submit函数的代码,再做详细分析
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError("cannot schedule new futures after shutdown") f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f
f和w是两个非常重要的变量,f作为submit返回的对象,submit函数的调用者可以对其添加回调,待fn执行完成后,会在当前线程执行,具体是如何实现的,这里先不说,下面再详细分析;w则是封装了线程需要执行的方法和参数,通过self._work_queue.put(w)方法放入一个队列当中。
self._adjust_thread_count()方法则是检查当前线程池的线程数量,如果小于设定的最大值,就开辟一个线程,代码就不上了,直接看这些个线程都是干嘛的
def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical("Exception in worker", exc_info=True)
这些线程就是一个死循环,不断的从任务队列中获取到_WorkItem,然后通过其封装方法,执行我们需要的任务。如果取到的任务为None,就往队列中再放入一个None,以通知其它线程结束,然后结束当前循环。
def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result)
如果没有异常,执行结束后,会执行之前我们说的回调。在self.future.set_result(result)方法中会执行任务回调,当然了,是在当前线程中。如果需要写入数据库之类的操作,不建议在回调中直接写入。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/40997.html
摘要:说多了都是泪,我之前排查内存泄漏的问题,超高并发的程序跑了个月后就崩溃。以前写中间件的时候,就总是把用户当,要尽量考虑各种情况避免内存泄漏。 从 Java 到 Python 本文为我和同事的共同研究成果 当跨语言的时候,有些东西在一门语言中很常见,但到了另一门语言中可能会很少见。 例如 C# 中,经常会关注拆箱装箱,但到了 Java 中却发现,根本没人关注这个。 后来才知道,原来是因为...
摘要:获取正在运行的线程数,用于状态监控。之后初始化组件主要是初始化线程池将到中,初始化开始时间等。如果线程池中运行线程数量为,并且默认,那么就停止退出,结束爬虫。 本系列文章,针对Webmagic 0.6.1版本 一个普通爬虫启动代码 public static void main(String[] args) { Spider.create(new GithubRepoPageP...
摘要:开头正式开启我入职的里程,现在已是工作了一个星期了,这个星期算是我入职的过渡期,算是知道了学校生活和工作的差距了,总之,尽快习惯这种生活吧。当时是看的廖雪峰的博客自己也用做爬虫写过几篇博客,不过有些是在前人的基础上写的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 开头 2017.08.21 正式开启我...
阅读 3562·2021-11-25 09:43
阅读 2568·2021-11-18 13:11
阅读 2140·2019-08-30 15:55
阅读 3253·2019-08-26 11:58
阅读 2801·2019-08-26 10:47
阅读 2165·2019-08-26 10:20
阅读 1246·2019-08-23 17:59
阅读 2958·2019-08-23 15:54