摘要:上一篇我们介绍了包,以及如何使用异步编程管理网络应用中的高并发。倒排索引保存在本地一个名为的文件中。运行示例如下这个模块没有使用并发,主要作用是为使用包编写的服务器提供支持。
async/await语法asyncio 上一篇我们介绍了 asyncio 包,以及如何使用异步编程管理网络应用中的高并发。在这一篇,我们主要介绍使用 asyncio 包编程的两个例子。
我们先介绍下 async/await 语法,要不然看完这篇可能会困惑,为什么之前使用 asyncio.coroutine 装饰器 和 yield from,这里都是 用的 async 和 await?
python并发2:使用asyncio处理并发
async/await 是Python3.5 的新语法,语法如下:
async def read_data(db): pass
async 是明确将函数声明为协程的关键字,即使没有await表达式,函数执行也会返回一个协程对象。
在协程函数内部,可以在某个表达式之前使用 await 关键字来暂停协程的执行,以等待某协程完成:
async def read_data(db): data = await db.fetch("SELECT ...")
这个代码如果使用 asyncio.coroutine 装饰器语法为:
@asyncio.coroutine def read_data(db): data = yield from db.fetch("SELECT ...")
这两段代码执行的结果是一样的,也就是说 可以把 asyncio.coroutine 替换为 async, yield from 替换为 await。
使用新的语法有什么好处呢:
使生成器和协程的概念更容易理解,因为语法不同
可以消除由于重构时不小心移出协程中yield 声明而导致的不明确错误,这回导致协程变成普通的生成器。
使用 asyncio 包编写服务器这个例子主要是使用 asyncio 包 和 unicodedata 模块,实现通过规范名称查找Unicode 字符。
我们先来看一下代码:
# charfinder.py import sys import re import unicodedata import pickle import warnings import itertools import functools from collections import namedtuple RE_WORD = re.compile("w+") RE_UNICODE_NAME = re.compile("^[A-Z0-9 -]+$") RE_CODEPOINT = re.compile("U+[0-9A-F]{4, 6}") INDEX_NAME = "charfinder_index.pickle" MINIMUM_SAVE_LEN = 10000 CJK_UNI_PREFIX = "CJK UNIFIED IDEOGRAPH" CJK_CMP_PREFIX = "CJK COMPATIBILITY IDEOGRAPH" sample_chars = [ "$", # DOLLAR SIGN "A", # LATIN CAPITAL LETTER A "a", # LATIN SMALL LETTER A "u20a0", # EURO-CURRENCY SIGN "u20ac", # EURO SIGN ] CharDescription = namedtuple("CharDescription", "code_str char name") QueryResult = namedtuple("QueryResult", "count items") def tokenize(text): """ :param text: :return: return iterable of uppercased words """ for match in RE_WORD.finditer(text): yield match.group().upper() def query_type(text): text_upper = text.upper() if "U+" in text_upper: return "CODEPOINT" elif RE_UNICODE_NAME.match(text_upper): return "NAME" else: return "CHARACTERS" class UnicodeNameIndex: # unicode name 索引类 def __init__(self, chars=None): self.load(chars) def load(self, chars=None): # 加载 unicode name self.index = None if chars is None: try: with open(INDEX_NAME, "rb") as fp: self.index = pickle.load(fp) except OSError: pass if self.index is None: self.build_index(chars) if len(self.index) > MINIMUM_SAVE_LEN: try: self.save() except OSError as exc: warnings.warn("Could not save {!r}: {}" .format(INDEX_NAME, exc)) def save(self): with open(INDEX_NAME, "wb") as fp: pickle.dump(self.index, fp) def build_index(self, chars=None): if chars is None: chars = (chr(i) for i in range(32, sys.maxunicode)) index = {} for char in chars: try: name = unicodedata.name(char) except ValueError: continue if name.startswith(CJK_UNI_PREFIX): name = CJK_UNI_PREFIX elif name.startswith(CJK_CMP_PREFIX): name = CJK_CMP_PREFIX for word in tokenize(name): index.setdefault(word, set()).add(char) self.index = index def word_rank(self, top=None): # (len(self.index[key], key) 是一个生成器,需要用list 转成列表,要不然下边排序会报错 res = [list((len(self.index[key], key)) for key in self.index)] res.sort(key=lambda item: (-item[0], item[1])) if top is not None: res = res[:top] return res def word_report(self, top=None): for postings, key in self.word_rank(top): print("{:5} {}".format(postings, key)) def find_chars(self, query, start=0, stop=None): stop = sys.maxsize if stop is None else stop result_sets = [] for word in tokenize(query): # tokenize 是query 的生成器 a b 会是 ["a", "b"] 的生成器 chars = self.index.get(word) if chars is None: result_sets = [] break result_sets.append(chars) if not result_sets: return QueryResult(0, ()) result = functools.reduce(set.intersection, result_sets) result = sorted(result) # must sort to support start, stop result_iter = itertools.islice(result, start, stop) return QueryResult(len(result), (char for char in result_iter)) def describe(self, char): code_str = "U+{:04X}".format(ord(char)) name = unicodedata.name(char) return CharDescription(code_str, char, name) def find_descriptions(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe(char) def get_descriptions(self, chars): for char in chars: yield self.describe(char) def describe_str(self, char): return "{:7} {} {}".format(*self.describe(char)) def find_description_strs(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe_str(char) @staticmethod # not an instance method due to concurrency def status(query, counter): if counter == 0: msg = "No match" elif counter == 1: msg = "1 match" else: msg = "{} matches".format(counter) return "{} for {!r}".format(msg, query) def main(*args): index = UnicodeNameIndex() query = " ".join(args) n = 0 for n, line in enumerate(index.find_description_strs(query), 1): print(line) print("({})".format(index.status(query, n))) if __name__ == "__main__": if len(sys.argv) > 1: main(*sys.argv[1:]) else: print("Usage: {} word1 [word2]...".format(sys.argv[0]))
这个模块读取Python内建的Unicode数据库,为每个字符名称中的每个单词建立索引,然后倒排索引,存入一个字典。
例如,在倒排索引中,"SUN" 键对应的条目是一个集合,里面是名称中包含"SUN" 这个词的10个Unicode字符。倒排索引保存在本地一个名为charfinder_index.pickle 的文件中。如果查询多个单词,会计算从索引中所得集合的交集。
运行示例如下:
>>> main("rook") # doctest: +NORMALIZE_WHITESPACE U+2656 ♖ WHITE CHESS ROOK U+265C ♜ BLACK CHESS ROOK (2 matches for "rook") >>> main("rook", "black") # doctest: +NORMALIZE_WHITESPACE U+265C ♜ BLACK CHESS ROOK (1 match for "rook black") >>> main("white bishop") # doctest: +NORMALIZE_WHITESPACE U+2657 ♗ WHITE CHESS BISHOP (1 match for "white bishop") >>> main("jabberwocky"s vest") (No match for "jabberwocky"s vest")
这个模块没有使用并发,主要作用是为使用 asyncio 包编写的服务器提供支持。
下面我们来看下 tcp_charfinder.py 脚本:
# tcp_charfinder.py import sys import asyncio # 用于构建索引,提供查询方法 from charfinder import UnicodeNameIndex CRLF = b" " PROMPT = b"?> " # 实例化UnicodeNameIndex 类,它会使用charfinder_index.pickle 文件 index = UnicodeNameIndex() async def handle_queries(reader, writer): # 这个协程要传给asyncio.start_server 函数,接收的两个参数是asyncio.StreamReader 对象和 asyncio.StreamWriter 对象 while True: # 这个循环处理会话,直到从客户端收到控制字符后退出 writer.write(PROMPT) # can"t await! # 这个方法不是协程,只是普通函数;这一行发送 ?> 提示符 await writer.drain() # must await! # 这个方法刷新writer 缓冲;因为它是协程,所以要用 await data = await reader.readline() # 这个方法也是协程,返回一个bytes对象,也要用await try: query = data.decode().strip() except UnicodeDecodeError: # Telenet 客户端发送控制字符时,可能会抛出UnicodeDecodeError异常 # 我们这里默认发送空字符 query = "x00" client = writer.get_extra_info("peername") # 返回套接字连接的远程地址 print("Received from {}: {!r}".format(client, query)) # 在控制台打印查询记录 if query: if ord(query[:1]) < 32: # 如果收到控制字符或者空字符,退出循环 break # 返回一个生成器,产出包含Unicode 码位、真正的字符和字符名称的字符串 lines = list(index.find_description_strs(query)) if lines: # 使用默认的UTF-8 编码把lines 转换成bytes 对象,并在每一行末添加回车符合换行符 # 参数列表是一个生成器 writer.writelines(line.encode() + CRLF for line in lines) writer.write(index.status(query, len(lines)).encode() + CRLF) # 输出状态 await writer.drain() # 刷新输出缓冲 print("Sent {} results".format(len(lines))) # 在服务器控制台记录响应 print("Close the client socket") # 在控制台记录会话结束 writer.close() # 关闭StreamWriter流 def main(address="127.0.0.1", port=2323): # 添加默认地址和端口,所以调用默认可以不加参数 port = int(port) loop = asyncio.get_event_loop() # asyncio.start_server 协程运行结束后, # 返回的协程对象返回一个asyncio.Server 实例,即一个TCP套接字服务器 server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro) # 驱动server_coro 协程,启动服务器 host = server.sockets[0].getsockname() # 获得这个服务器的第一个套接字的地址和端口 print("Serving on {}. Hit CTRL-C to stop.".format(host)) # 在控制台中显示地址和端口 try: loop.run_forever() # 运行事件循环 main 函数在这里阻塞,直到服务器的控制台中按CTRL-C 键 except KeyboardInterrupt: # CTRL+C pressed pass print("Server shutting down.") server.close() # server.wait_closed返回一个 future # 调用loop.run_until_complete 方法,运行 future loop.run_until_complete(server.wait_closed()) loop.close() # 终止事件循环 if __name__ == "__main__": main(*sys.argv[1:])
运行 tcp_charfinders.py
python tcp_charfinders.py
打开终端,使用 telnet 命令请求服务,运行结果如下所示:
main 函数几乎会立即显示 Serving on... 消息,然后在调用loop.run_forever() 方法时阻塞。这时,控制权流动到事件循环中,而且一直等待,偶尔会回到handle_queries 协程,这个协程需要等待网络发送或接收数据时,控制权又交给事件循环。
handle_queries 协程可以处理多个客户端发来的多次请求。只要有新客户端连接服务器,就会启动一个handle_queries 协程实例。
handle_queries 的I/O操作都是使用bytes格式。我们从网络得到的数据要解码,发出去的数据也要编码
asyncio包提供了高层的流API,提供了现成的服务器,我们只需要实现一个处理程序。详细信息可以查看文档:https://docs.python.org/3/library/asyncio-stream.html
虽然,asyncio包提供了服务器,但是功能相对来说还是比较简陋的,现在我们使用一下 基于asyncio包的 web 框架 sanci,用它来实现一个http版的简易服务器
使用 sanic 包编写web 服务器sanic 的简单入门在上一篇文章有介绍,python web 框架 Sanci 快速入门
Sanic 是一个和类Flask 的基于Python3.5+的web框架,提供了比较高阶的API,比如路由、request参数,response等,我们只需要实现处理逻辑即可。
下边是使用 sanic 实现的简易的 字符查询http web 服务:
from sanic import Sanic from sanic import response from charfinder import UnicodeNameIndex app = Sanic() index = UnicodeNameIndex() html_temp = "{char}
" @app.route("/charfinder") # app.route 函数的第一个参数是url path,我们这里指定路径是charfinder async def charfinder(request): # request.args 可以取到url 的查询参数 # ?key1=value1&key2=value2 的结果是 {"key1": ["value1"], "key2": ["value2"]} # 我们这里支持传入多个查询参数,所以这里使用 request.args.getlist("char") # 如果我们 使用 request.args.get("char") 只能取到第一个参数 query = request.args.getlist("char") query = " ".join(query) lines = list(index.find_description_strs(query)) # 将得到的结果生成html html = " ".join([html_temp.format(char=line) for line in lines]) return response.html(html) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000) # 设置服务器运行地址和端口号
对比两段代码可以发现,使用 sanic 非常简单。
运行服务:
python http_charsfinder.py
我们在浏览器输入地址 http://0.0.0.0:8000/charfinde... 结果示例如下
现在对比下两段代码在TCP 的示例中,服务器通过main函数下的这两行代码创建并排定运行时间:
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro)
而在sanic的HTTP示例中,使用,创建服务器:
app.run(host="0.0.0.0", port=8000)
这两个看起来运行方式完全不同,但如果我们翻开sanic的源码会看到 app.run() 内部是调用 的 server_coroutine = loop.create_server()创建服务器,
server_coroutine 是通过 loop.run_until_complete()驱动的。
所以说,为了启动服务器,这两个都是由 loop.run_until_complete 驱动,完成运行的。只不过 sanic 封装了run 方法,使得使用更加方便。
这里可以得到一个基本事实:只有驱动协程,协程才能做事,而驱动 asyncio.coroutine 装饰的协程有两种方式,使用 yield from 或者传给asyncio 包中某个参数为协程或future的函数,例如 run_until_complete
现在如果你搜索 cjk,会得到7万多条数据3M 的一个html文件,耗时大约2s,这如果是生产服务的一个请求,耗时2s是不能接收的,我们可以使用分页,这样我们可以每次只取200条数据,当用户想看更多数据时再使用 ajax 或者 websockets发送下一批数据。
这一篇我们使用 asyncio 包实现了TCP服务器,使用sanic(基于asyncio sanic 默认使用 uvloop替代asyncio)实现了HTTP服务器,用于按名称搜索Unicode 字符。但是并没有涉及服务器并发部分,这部分可以以后再讨论。
参考链接这一篇还是 《流畅的python》asyncio 一章的读书笔记,下一篇将是python并发的第三篇,《使用线程处理并发》。
Python 3.5将支持Async/Await异步编程:http://www.infoq.com/cn/news/2015/05/python-async-await
python web 框架 Sanci 快速入门
python并发2:使用asyncio处理并发
最后,感谢女朋友支持。
>欢迎关注 | >请我喝芬达 |
---|---|
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/38648.html
摘要:并发用于制定方案,用来解决可能但未必并行的问题。在协程中使用需要注意两点使用链接的多个协程最终必须由不是协程的调用方驱动,调用方显式或隐式在最外层委派生成器上调用函数或方法。对象可以取消取消后会在协程当前暂停的处抛出异常。 导语:本文章记录了本人在学习Python基础之控制流程篇的重点知识及个人心得,打算入门Python的朋友们可以来一起学习并交流。 本文重点: 1、了解asyncio...
摘要:具有以下基本同步原语子进程提供了通过创建和管理子进程的。虽然队列不是线程安全的,但它们被设计为专门用于代码。表示异步操作的最终结果。 Python的asyncio是使用 async/await 语法编写并发代码的标准库。通过上一节的讲解,我们了解了它不断变化的发展历史。到了Python最新稳定版 3.7 这个版本,asyncio又做了比较大的调整,把这个库的API分为了 高层级API和...
摘要:是之后引入的标准库的,这个包使用事件循环驱动的协程实现并发。没有能从外部终止线程,因为线程随时可能被中断。上一篇并发使用处理并发我们介绍过的,在中,只是调度执行某物的结果。 asyncio asyncio 是Python3.4 之后引入的标准库的,这个包使用事件循环驱动的协程实现并发。asyncio 包在引入标准库之前代号 Tulip(郁金香),所以在网上搜索资料时,会经常看到这种花的...
摘要:我们以请求网络服务为例,来实际测试一下加入多线程之后的效果。所以,执行密集型操作时,多线程是有用的,对于密集型操作,则每次只能使用一个线程。说到这里,对于密集型,可以使用多线程或者多进程来提高效率。 为了提高系统密集型运算的效率,我们常常会使用到多个进程或者是多个线程,python中的Threading包实现了线程,multiprocessing 包则实现了多进程。而在3.2版本的py...
摘要:创建第一个协程推荐使用语法来声明协程,来编写异步应用程序。协程两个紧密相关的概念是协程函数通过定义的函数协程对象调用协程函数返回的对象。它是一个低层级的可等待对象,表示一个异步操作的最终结果。 我们讲以Python 3.7 上的asyncio为例讲解如何使用Python的异步IO。 showImg(https://segmentfault.com/img/remote/14600000...
阅读 2935·2020-01-08 12:17
阅读 1974·2019-08-30 15:54
阅读 1135·2019-08-30 15:52
阅读 2008·2019-08-29 17:18
阅读 1019·2019-08-29 15:34
阅读 2436·2019-08-27 10:58
阅读 1848·2019-08-26 12:24
阅读 351·2019-08-23 18:23