资讯专栏INFORMATION COLUMN

Python线程池源码分析

ephererid / 2352人阅读

摘要:对线程池的研究是之前对分析的附加工作。在之前对源码分析的文章中,写到调度器将任务放入线程池的函数这里分析的线程池类是,也就是上述代码中所使用的类。

对Python线程池的研究是之前对Apshceduler分析的附加工作。

在之前对Apshceduler源码分析的文章中,写到调度器将任务放入线程池的函数

    def _do_submit_job(self, job, run_times):
        def callback(f):
            exc, tb = (f.exception_info() if hasattr(f, "exception_info") else
                       (f.exception(), getattr(f.exception(), "__traceback__", None)))
            if exc:
                self._run_job_error(job.id, exc, tb)
            else:
                self._run_job_success(job.id, f.result())

        f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name)
        f.add_done_callback(callback)

这里分析的线程池类是concurrent.futures.ThreadPoolExecutor,也就是上述代码中self._pool所使用的类。先上self._pool.submit函数的代码,再做详细分析

    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError("cannot schedule new futures after shutdown")

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f

f和w是两个非常重要的变量,f作为submit返回的对象,submit函数的调用者可以对其添加回调,待fn执行完成后,会在当前线程执行,具体是如何实现的,这里先不说,下面再详细分析;w则是封装了线程需要执行的方法和参数,通过self._work_queue.put(w)方法放入一个队列当中。

self._adjust_thread_count()方法则是检查当前线程池的线程数量,如果小于设定的最大值,就开辟一个线程,代码就不上了,直接看这些个线程都是干嘛的

def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical("Exception in worker", exc_info=True)

这些线程就是一个死循环,不断的从任务队列中获取到_WorkItem,然后通过其封装方法,执行我们需要的任务。如果取到的任务为None,就往队列中再放入一个None,以通知其它线程结束,然后结束当前循环。

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)

如果没有异常,执行结束后,会执行之前我们说的回调。在self.future.set_result(result)方法中会执行任务回调,当然了,是在当前线程中。如果需要写入数据库之类的操作,不建议在回调中直接写入。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40997.html

相关文章

  • Python 中的 MySQL 数据库连接

    摘要:说多了都是泪,我之前排查内存泄漏的问题,超高并发的程序跑了个月后就崩溃。以前写中间件的时候,就总是把用户当,要尽量考虑各种情况避免内存泄漏。 从 Java 到 Python 本文为我和同事的共同研究成果 当跨语言的时候,有些东西在一门语言中很常见,但到了另一门语言中可能会很少见。 例如 C# 中,经常会关注拆箱装箱,但到了 Java 中却发现,根本没人关注这个。 后来才知道,原来是因为...

    Paul_King 评论0 收藏0
  • 爬虫框架Webmagic源码分析之Spider

    摘要:获取正在运行的线程数,用于状态监控。之后初始化组件主要是初始化线程池将到中,初始化开始时间等。如果线程池中运行线程数量为,并且默认,那么就停止退出,结束爬虫。 本系列文章,针对Webmagic 0.6.1版本 一个普通爬虫启动代码 public static void main(String[] args) { Spider.create(new GithubRepoPageP...

    邹立鹏 评论0 收藏0
  • 那些年我看过的书 —— 致敬我的大学生活 —— Say Good Bye !

    摘要:开头正式开启我入职的里程,现在已是工作了一个星期了,这个星期算是我入职的过渡期,算是知道了学校生活和工作的差距了,总之,尽快习惯这种生活吧。当时是看的廖雪峰的博客自己也用做爬虫写过几篇博客,不过有些是在前人的基础上写的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 开头 2017.08.21 正式开启我...

    xiaoqibTn 评论0 收藏0

发表评论

0条评论

ephererid

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<