资讯专栏INFORMATION COLUMN

tornado 源码之 coroutine 分析

NicolasHe / 1819人阅读

摘要:源码之分析的协程原理分析版本为支持异步,实现了一个协程库。提供了回调函数注册当异步事件完成后,调用注册的回调中间结果保存结束结果返回等功能注册回调函数,当被解决时,改回调函数被调用。相当于唤醒已经处于状态的父协程,通过回调函数,再执行。

tornado 源码之 coroutine 分析
tornado 的协程原理分析  
版本:4.3.0

为支持异步,tornado 实现了一个协程库。

tornado 实现的协程框架有下面几个特点:

支持 python 2.7,没有使用 yield from
特性,纯粹使用 yield 实现

使用抛出异常的方式从协程返回值

采用 Future 类代理协程(保存协程的执行结果,当携程执行结束时,调用注册的回调函数)

使用 IOLoop 事件循环,当事件发生时在循环中调用注册的回调,驱动协程向前执行

由此可见,这是 python 协程的一个经典的实现。

本文将实现一个类似 tornado 实现的基础协程框架,并阐述相应的原理。

外部库

使用 time 来实现定时器回调的时间计算。
bisect 的 insort 方法维护一个时间有限的定时器队列。
functools 的 partial 方法绑定函数部分参数。
使用 backports_abc 导入 Generator 来判断函数是否是生成器。

import time
import bisect
import functools
from backports_abc import Generator as GeneratorType
Future
是一个穿梭于协程和调度器之间的信使。  
提供了回调函数注册(当异步事件完成后,调用注册的回调)、中间结果保存、结束结果返回等功能

add_done_callback 注册回调函数,当 Future 被解决时,改回调函数被调用。
set_result 设置最终的状态,并且调用已注册的回调函数

协程中的每一个 yield 对应一个协程,相应的对应一个 Future 对象,譬如:

@coroutine
def routine_main():
    yield routine_simple()

    yield sleep(1)

这里的 routine_simple() 和 sleep(1) 分别对应一个协程,同时有一个 Future 对应。

class Future(object):
    def __init__(self):
        self._done = False
        self._callbacks = []
        self._result = None

    def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            cb(self)
        self._callbacks = None

    def done(self):
        return self._done

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def result(self):
        return self._result
IOLoop

这里的 IOLoop 去掉了 tornado 源代码中 IO 相关部分,只保留了基本需要的功能,如果命名为 CoroutineLoop 更贴切。

这里的 IOLoop 提供基本的回调功能。它是一个线程循环,在循环中完成两件事:

检测有没有注册的回调并执行

检测有没有到期的定时器回调并执行

程序中注册的回调事件,最终都会在此处执行。
可以认为,协程程序本身、协程的驱动程序 都会在此处执行。
协程本身使用 wrapper 包装,并最后注册到 IOLoop 的事件回调,所以它的从预激到结束的代码全部在 IOLoop 回调中执行。
而协程预激后,会把 Runner.run() 函数注册到 IOLoop 的事件回调,以驱动协程向前运行。

理解这一点对于理解协程的运行原理至关重要。

这就是单线程异步的基本原理。因为都在一个线程循环中执行,我们可以不用处理多线程需要面对的各种繁琐的事情。

IOLoop.start

事件循环,回调事件和定时器事件在循环中调用。

IOLoop.run_sync

执行一个协程。

将 run 注册进全局回调,在 run 中调用 func()启动协程。
注册协程结束回调 stop, 退出 run_sync 的 start 循环,事件循环随之结束。

class IOLoop(object):,
    def __init__(self):
        self._callbacks = []
        self._timers = []
        self._running = False

    @classmethod
    def instance(cls):
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def add_future(self, future, callback):
        future.add_done_callback(
            lambda future: self.add_callback(functools.partial(callback, future)))

    def add_timeout(self, when, callback):
        bisect.insort(self._timers, (when, callback))

    def call_later(self, delay, callback):
        return self.add_timeout(time.time() + delay, callback)

    def add_callback(self, call_back):
        self._callbacks.append(call_back)

    def start(self):
        self._running = True
        while self._running:

            # 回调任务
            callbacks = self._callbacks
            self._callbacks = []
            for call_back in callbacks:
                call_back()

            # 定时器任务
            while self._timers and self._timers[0][0] < time.time():
                task = self._timers[0][1]
                del self._timers[0]
                task()

    def stop(self):
        self._running = False

    def run_sync(self, func):
        future_cell = [None]

        def run():
            try:
                future_cell[0] = func()
            except Exception:
                pass

            self.add_future(future_cell[0], lambda future: self.stop())

        self.add_callback(run)

        self.start()
        return future_cell[0].result()
coroutine

协程装饰器。
协程由 coroutine 装饰,分为两类:

含 yield 的生成器函数

无 yield 语句的普通函数

装饰协程,并通过注册回调驱动协程运行。
程序中通过 yield coroutine_func() 方式调用协程。
此时,wrapper 函数被调用:

获取协程生成器

如果是生成器,则

调用 next() 预激协程

实例化 Runner(),驱动协程

如果是普通函数,则

调用 set_result() 结束协程

协程返回 Future 对象,供外层的协程处理。外部通过操作该 Future 控制协程的运行。
每个 yield 对应一个协程,每个协程拥有一个 Future 对象。

外部协程获取到内部协程的 Future 对象,如果内部协程尚未结束,将 Runner.run() 方法注册到 内部协程的 Future 的结束回调。
这样,在内部协程结束时,会调用注册的 run() 方法,从而驱动外部协程向前执行。

各个协程通过 Future 形成一个链式回调关系。

Runner 类在下面多带带小节描述。

def coroutine(func):
    return _make_coroutine_wrapper(func)

# 每个协程都有一个 future, 代表当前协程的运行状态
def _make_coroutine_wrapper(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            return future
        else:
            if isinstance(result, GeneratorType):
                try:
                    yielded = next(result)
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    pass
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper
协程返回值

因为没有使用 yield from,协程无法直接返回值,所以使用抛出异常的方式返回。

python 2 无法在生成器中使用 return 语句。但是生成器中抛出的异常可以在外部 send() 语句中捕获。
所以,使用抛出异常的方式,将返回值存储在异常的 value 属性中,抛出。外部使用诸如:

try:
    yielded = gen.send(value)
except Return as e:

这样的方式获取协程的返回值。

class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        self.args = (value,)
Runner

Runner 是协程的驱动器类。

self.result_future 保存当前协程的状态。
self.future 保存 yield 子协程传递回来的协程状态。
从子协程的 future 获取协程运行结果 send 给当前协程,以驱动协程向前执行。

注意,会判断子协程返回的 future
如果 future 已经 set_result,代表子协程运行结束,回到 while Ture 循环,继续往下执行下一个 send;
如果 future 未 set_result,代表子协程运行未结束,将 self.run 注册到子协程结束的回调,这样,子协程结束时会调用 self.run,重新驱动协程执行。

如果本协程 send() 执行过程中,捕获到 StopIteration 或者 Return 异常,说明本协程执行结束,设置 result_future 的协程返回值,此时,注册的回调函数被执行。这里的回调函数为本协程的父协程所注册的 run()。
相当于唤醒已经处于 yiled 状态的父协程,通过 IOLoop 回调 run 函数,再执行 send()。

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.io_loop = IOLoop.instance()
        self.running = False
        self.future = None

        if self.handle_yield(first_yielded):
            self.run()

    def run(self):
        try:
            self.running = True
            while True:

                try:
                    # 每一个 yield 处看做一个协程,对应一个 Future
                    # 将该协程的结果 send 出去
                    # 这样外层形如  ret = yiled coroutine_func() 能够获取到协程的返回数据
                    value = self.future.result()
                    yielded = self.gen.send(value)
                except (StopIteration, Return) as e:
                    # 协程执行完成,不再注册回调
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    return
                except Exception:
                    return
                # 协程未执行结束,继续使用 self.run() 进行驱动
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

    def handle_yield(self, yielded):
        self.future = yielded
        if not self.future.done():
            # 给 future 增加执行结束回调函数,这样,外部使用 future.set_result 时会调用该回调
            # 而该回调是把 self.run() 注册到 IOLoop 的事件循环
            # 所以,future.set_result 会把 self.run() 注册到 IOLoop 的事件循环,从而在下一个事件循环中调用
            self.io_loop.add_future(
                self.future, lambda f: self.run())
            return False
        return True
sleep

sleep 是一个延时协程,充分展示了协程的标准实现。

创建一个 Future,并返回给外部协程;

外部协程发现是一个未完的状态,将 run()注册到 Future 的完成回调,同时外部协程被挂起;

在设置的延时后,IOLoop 会回调 set_result 结束协程;

IOLoop 调用 run() 函数;

IOLoop 调用 send(),唤醒挂起的外部协程。

流程如下图:

def sleep(duration):
    f = Future()
    IOLoop.instance().call_later(duration, lambda: f.set_result(None))
    return f
运行
@coroutine
def routine_ur(url, wait):
    yield sleep(wait)
    print("routine_ur {} took {}s to get!".format(url, wait))


@coroutine
def routine_url_with_return(url, wait):
    yield sleep(wait)
    print("routine_url_with_return {} took {}s to get!".format(url, wait))
    raise Return((url, wait))

# 非生成器协程,不会为之生成多带带的 Runner()
# coroutine 运行结束后,直接返回一个已经执行结束的 future
@coroutine
def routine_simple():
    print("it is simple routine")

@coroutine
def routine_simple_return():
    print("it is simple routine with return")
    raise Return("value from routine_simple_return")

@coroutine
def routine_main():
    yield routine_simple()

    yield routine_ur("url0", 1)

    ret = yield routine_simple_return()
    print(ret)

    ret = yield routine_url_with_return("url1", 1)
    print(ret)

    ret = yield routine_url_with_return("url2", 2)
    print(ret)


if __name__ == "__main__":
    IOLoop.instance().run_sync(routine_main)

运行输出为:

it is simple routine
routine_ur url0 took 1s to get!
it is simple routine with return
value from routine_simple_return
routine_url_with_return url1 took 1s to get!
("url1", 1)
routine_url_with_return url2 took 2s to get!
("url2", 2)

可以观察到协程 sleep 已经生效。

源码

simple_coroutine.py

copyright

author:bigfish
copyright: 许可协议 知识共享署名-非商业性使用 4.0 国际许可协议

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/45036.html

相关文章

  • Tornado Demo chatdemo 不完全解读

    摘要:清楚了以上流程,我们直接来看函数主要用作初始化应用监听端口以及启动。其中就是保存聊天室所有聊天消息的结构。关于的解读我会放到阅读源码时讲。然后把消息加到缓存里,如果缓存大于限制则取最新的条消息。 tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo 首先看 chatdemo 的目录结构: ├── chatdemo.py ├── ...

    TesterHome 评论0 收藏0
  • tornado 源码阅读-初步认识

    摘要:序言最近闲暇无事阅读了一下的源码对整体的结构有了初步认识与大家分享不知道为什么右边的目录一直出不来非常不舒服不如移步到吧是的核心模块也是个调度模块各种异步事件都是由他调度的所以必须弄清他的执行逻辑源码分析而的核心部分则是这个循环内部的逻辑贴 序言 最近闲暇无事,阅读了一下tornado的源码,对整体的结构有了初步认识,与大家分享 不知道为什么右边的目录一直出不来,非常不舒服. 不如移...

    2450184176 评论0 收藏0
  • tornado 源码分析 异步io的实现方式

    摘要:前言本文将尝试详细的带大家一步步走完一个异步操作从而了解是如何实现异步的其实本文是对上一篇文的实践和复习主旨在于关注异步的实现所以会忽略掉代码中的一些异常处理文字较多凑合下吧接下来只会贴出部分源码帮助理解希望有耐心的同学打开源码一起跟踪一遍 前言 本文将尝试详细的带大家一步步走完一个异步操作,从而了解tornado是如何实现异步io的. 其实本文是对[上一篇文][1]的实践和复习 主...

    xiangzhihong 评论0 收藏0
  • 实践,用tornado实现自定义协议server

    摘要:前言俗话说光说不练假把式上一篇文里都只是光看着别人的源码说貌似有点纸上谈兵的意思所以这次写一个简单的自己定义协议的既可以熟悉和的用法又可以在去除了复杂的协议后了解的工作原理代码不多加上空行和也就行不到在上的源码点这里目标定义一个简单的协议达 前言 俗话说光说不练假把式,上一篇文里都只是光看着别人的源码说,貌似有点纸上谈兵的意思. 所以这次写一个简单的,自己定义协议的server. 既...

    wizChen 评论0 收藏0
  • 谈谈项目的重构与测试

    这篇文章摘自我的博客, 欢迎大家没事去逛逛~ 背景 这几个月我开发了公司里的一个restful webservice,起初技术选型的时候是采用了flask框架。虽然flask是一个同步的框架,但是可以配合gevent或者其它方式运行在异步的容器中(测试链接),效果看上去也还可以,因此就采用了这种方式。 后面阅读了tornado的源码,也去了解了各种协程框架以及运行的原理。总感觉flask的这种同步...

    Lavender 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<