资讯专栏INFORMATION COLUMN

PythonAsyncio生产调度基本原理详细信息

89542767 / 510人阅读

  此篇文章关键阐述了PythonAsyncio生产调度基本原理详细信息,Python.Asyncio是1个专而精的库,它包括一些功效,而跟关键生产调度有关的思路除开三类可在等待目标外,还有其他某些功效,他们各自坐落于runners.py,base_event.py,event.py3个文档中


  序言


  在本文《PythonAsyncio中Coroutines,Tasks,Future可在等待对象关联及功效》中阐述了Python的可在等待目标功效,尤其是Task目标在运行的时候也可以自我驱动,但一个Task目标只有推动1条实行链,如果想好几条链实行(高并发),也是需要EventLoop来制定推动,下面将采取Python.Asyncio库的源代码去了解EventLoop是怎样运转的。


  1.简单介绍


  Python.Asyncio是1个专而精的库,它包括一些功效,而跟关键生产调度有关的思路除开三类可在等待目标外,还有其他某些功效,他们各自坐落于runners.py,base_event.py,event.py3个文档中。


  runners.py文件有个最主要的类--Runner,它工作职责是保证进到协同程序方式的事件循环直到复位工作中,及在撤出协同程序方式时清除仍在运行内存的协同程序,制作器等目标。


  协同程序方式仅仅是为了能便于了解,对电子计算机来说,并没那样区别


  event.py文件除开储放着EventLoop对象插口及其得到和设定EventLoop的函数公式外,两个EventLoop可生产调度对象,分别是Handler和TimerHandler,他们能够称之为EvnetLoop启用其他对象器皿,用以连招待生产调度目标和事件循环之间的关系,但是它们完成比较简单,对Handler,它源代码如下所示:


  #早已移除了个别不愿关的编码
  class Handle:
  def __init__(self,callback,args,loop,context=None):
  #初始化上下文,确保执行的时候能找到Handle所在的上下文
  if context is None:
  context=contextvars.copy_context()
  self._context=context
  self._loop=loop
  self._callback=callback
  self._args=args
  self._cancelled=False
  def cancel(self):
  #设置当前Handle为取消状态
  if not self._cancelled:
  self._cancelled=True
  self._callback=None
  self._args=None
  def cancelled(self):
  return self._cancelled
  def _run(self):
  #用于执行真正的函数,且通过context.run方法来确保在自己的上下文内执行。
  try:
  #保持在自己持有的上下文中执行对应的回调
  self._context.run(self._callback,*self._args)
  except(SystemExit,KeyboardInterrupt):
  raise
  except BaseException as exc:
  cb=format_helpers._format_callback_source(
  self._callback,self._args)
  msg=f'Exception in callback{cb}'
  context={
  'message':msg,
  'exception':exc,
  'handle':self,
  }
  self._loop.call_exception_handler(context)


  通过源码可以发现,Handle功能十分简单,提供了可以被取消以及可以在自己所处的上下文执行的功能,而TimerHandle继承于Handle比Handle多了一些和时间以及排序相关的参数,源码如下:


  class TimerHandle(Handle):
  def __init__(self,when,callback,args,loop,context=None):
  super().__init__(callback,args,loop,context)
  self._when=when
  self._scheduled=False
  def __hash__(self):
  return hash(self._when)
  def __lt__(self,other):
  if isinstance(other,TimerHandle):
  return self._when<other._when
  return NotImplemented
  def __le__(self,other):
  if isinstance(other,TimerHandle):
  return self._when<other._when or self.__eq__(other)
  return NotImplemented
  def __gt__(self,other):
  if isinstance(other,TimerHandle):
  return self._when>other._when
  return NotImplemented
  def __ge__(self,other):
  if isinstance(other,TimerHandle):
  return self._when>other._when or self.__eq__(other)
  return NotImplemented
  def __eq__(self,other):
  if isinstance(other,TimerHandle):
  return(self._when==other._when and
  self._callback==other._callback and
  self._args==other._args and
  self._cancelled==other._cancelled)
  return NotImplemented
  def cancel(self):
  if not self._cancelled:
  #用于通知事件循环当前Handle已经退出了
  self._loop._timer_handle_cancelled(self)
  super().cancel()
  def when(self):
  return self._when


  通过代码可以发现,这两个对象十分简单,而我们在使用Python.Asyncio时并不会直接使用到这两个对象,而是通过loop.call_xxx系列方法来把调用封装成Handle对象,然后等待EventLoop执行。所以loop.call_xxx系列方法可以认为是EventLoop的注册操作,基本上所有非IO的异步操作都需要通过loop.call_xxx方法来把自己的调用注册到EventLoop中,比如Task对象就在初始化后通过调用loop.call_soon方法来注册到EventLoop中,loop.call_sonn的实现很简单,


  它的源码如下:


  class BaseEventLoop:
  ...
  def call_soon(self,callback,*args,context=None):
  #检查是否事件循环是否关闭,如果是则直接抛出异常
  self._check_closed()
  handle=self._call_soon(callback,args,context)
  return handle
  def _call_soon(self,callback,args,context):
  #把调用封装成一个handle,这样方便被事件循环调用
  handle=events.Handle(callback,args,self,context)
  #添加一个handle到_ready,等待被调用
  self._ready.append(handle)
  return handle


  可以看到call_soon真正相关的代码只有10几行,它负责把一个调用封装成一个Handle,并添加到self._reday中,从而实现把调用注册到事件循环之中。


  loop.call_xxx系列函数除了loop.call_soon系列函数外,还有另外两个方法--loop.call_at和loop.call_later,它们类似于loop.call_soon,不过多了一个时间参数,来告诉EventLoop在什么时间后才可以调用,同时通过loop.call_at和loop.call_later注册的调用会通过Python的堆排序模块headpq注册到self._scheduled变量中,


  具体代码如下:


  class BaseEventLoop:
  ...
  def call_later(self,delay,callback,*args,context=None):
  if delay is None:
  raise TypeError('delay must not be None')
  timer=self.call_at(self.time()+delay,callback,*args,context=context)
  return timer
  def call_at(self,when,callback,*args,context=None):
  if when is None:
  raise TypeError("when cannot be None")
  self._check_closed()
  #创建一个timer handle,然后添加到事件循环的_scheduled中,等待被调用
  timer=events.TimerHandle(when,callback,args,self,context)
  heapq.heappush(self._scheduled,timer)
  timer._scheduled=True
  return timer


  2.EventLoop的调度实现


  在文章《Python Asyncio中Coroutines,Tasks,Future可等待对象的关系及作用》中已经分析到了runner会通过loop.run_until_complete来调用mainTask从而开启EventLoop的调度,所以在分析EventLoop的调度时,应该先从loop.run_until_complete入手,


  对应的源码如下:


  class BaseEventLoop:
  def run_until_complete(self,future):
  ...
  new_task=not futures.isfuture(future)
  #把coroutine转换成task,这样事件循环就可以调度了,事件循环的最小调度单位为task
  #需要注意的是此时事件循环并没注册到全局变量中,所以需要显示的传进去,
  #同时Task对象注册的时候,已经通过loop.call_soon把自己注册到事件循环中,等待调度
  future=tasks.ensure_future(future,loop=self)
  if new_task:
  #An exception is raised if the future didn't complete,so there
  #is no need to log the"destroy pending task"message
  future._log_destroy_pending=False
  #当该task完成时,意味着当前事件循环失去了调度对象,无法继续调度,所以需要关闭当前事件循环,程序会由协程模式返回到线程模式
  future.add_done_callback(_run_until_complete_cb)
  try:
  #事件循环开始运行
  self.run_forever()
  except:
  if new_task and future.done()and not future.cancelled():
  #The coroutine raised a BaseException.Consume the exception
  #to not log a warning,the caller doesn't have access to the
  #local task.
  future.exception()
  raise
  finally:
  future.remove_done_callback(_run_until_complete_cb)
  if not future.done():
  raise RuntimeError('Event loop stopped before Future completed.')
  return future.result()
  def run_forever(self):
  #进行一些初始化工作
  self._check_closed()
  self._check_running()
  self._set_coroutine_origin_tracking(self._debug)
  self._thread_id=threading.get_ident()
  old_agen_hooks=sys.get_asyncgen_hooks()
  #通过asyncgen钩子来自动关闭asyncgen函数,这样可以提醒用户生成器还未关闭
  sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
  finalizer=self._asyncgen_finalizer_hook)
  try:
  #设置当前在运行的事件循环到全局变量中,这样就可以在任一阶段获取到当前的事件循环了
  events._set_running_loop(self)
  while True:
  #正真执行任务的逻辑
  self._run_once()
  if self._stopping:
  break
  finally:
  #关闭循环,并且清理一些资源
  self._stopping=False
  self._thread_id=None
  events._set_running_loop(None)
  self._set_coroutine_origin_tracking(False)
  sys.set_asyncgen_hooks(*old_agen_hooks)


  这段源码并不复杂,它的主要逻辑是通过把Corotinue转为一个Task对象,然后通过Task对象初始化时调用loop.call_sonn方法把自己注册到EventLoop中,最后再通过loop.run_forever中的循环代码一直运行着,直到_stopping被标记为True:


  while True:
  #正真执行任务的逻辑
  self._run_once()
  if self._stopping:
  break
  可以看出,这段代码是确保事件循环能一直执行着,自动循环结束,而真正调度的核心是_run_once函数,
  它的源码如下:
  class BaseEventLoop:
  ...
  def _run_once(self):
  #self._scheduled是一个列表,它只存放TimerHandle
  sched_count=len(self._scheduled)
  ###############################
  #第一阶段,整理self._scheduled#
  ###############################
  if(sched_count>_MIN_SCHEDULED_TIMER_HANDLES and
  self._timer_cancelled_count/sched_count>_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
  #当待调度的任务数量超过100且待取消的任务占总任务的50%时,才进入这个逻辑
  #把需要取消的任务移除
  new_scheduled=[]
  for handle in self._scheduled:
  if handle._cancelled:
  #设置handle的_cancelled为True,并且把handle从_scheduled中移除
  handle._scheduled=False
  else:
  new_scheduled.append(handle)
  #重新排列堆
  heapq.heapify(new_scheduled)
  self._scheduled=new_scheduled
  self._timer_cancelled_count=0
  else:
  #需要取消的handle不多,则只会走这个逻辑,这里会把堆顶的handle弹出,并标记为不可调度,但不会访问整个堆
  while self._scheduled and self._scheduled[0]._cancelled:
  self._timer_cancelled_count-=1
  handle=heapq.heappop(self._scheduled)
  handle._scheduled=False
  #################################
  #第二阶段,计算超时值以及等待事件IO#
  #################################
  timeout=None
  #当有准备调度的handle或者是正在关闭时,不等待,方便尽快的调度
  if self._ready or self._stopping:
  timeout=0
  elif self._scheduled:
  #Compute the desired timeout.
  #如果堆有数据时,通过堆顶的handle计算最短的超时时间,但是最多不能超过MAXIMUM_SELECT_TIMEOUT,以免超过系统限制
  when=self._scheduled[0]._when
  timeout=min(max(0,when-self.time()),MAXIMUM_SELECT_TIMEOUT)
  #事件循环等待事件,直到有事件或者超时
  event_list=self._selector.select(timeout)
  ##################################################
  #第三阶段,把满足条件的TimeHandle放入到self._ready中#
  ##################################################
  #获取得到的事件的回调,然后装填到_ready
  self._process_events(event_list)
  #把一些在self._scheduled且满足调度条件的handle放到_ready中,比如TimerHandle。
  #end_time为当前时间+一个时间单位,猜测是能多处理一些这段时间内产生的事件
  end_time=self.time()+self._clock_resolution
  while self._scheduled:
  handle=self._scheduled[0]
  if handle._when>=end_time:
  break
  handle=heapq.heappop(self._scheduled)
  handle._scheduled=False
  self._ready.append(handle)
  ################################################################################
  #第四阶段,遍历所有准备调度的handle,并且通过handle的context来执行handle对应的callback#
  ################################################################################
  ntodo=len(self._ready)
  for i in range(ntodo):
  handle=self._ready.popleft()
  #如果handle已经被取消,则不调用
  if handle._cancelled:
  continue
  if self._debug:
  try:
  self._current_handle=handle
  t0=self.time()
  handle._run()
  dt=self.time()-t0
  if dt>=self.slow_callback_duration:
  #执行太久的回调,记录下来,这些需要开发者自己优化
  logger.warning('Executing%s took%.3f seconds',
  _format_handle(handle),dt)
  finally:
  self._current_handle=None
  else:
  handle._run()
  handle=None#Needed to break cycles when an exception occurs.

  根据源码分析,能够非常明确了解生产调度逻辑性中首先要先整齐self._scheduled,在整齐的一个过程是采用希尔排序去进行的,由于希尔排序在生产调度的场景中高效率非常高,但是这一段整齐编码分为二种,我猜测是当要关闭的总数太多立即赋值效率更高一些。在整齐self._scheduled后,就进入了第2步,该流程逐渐等候系统软件事件循环回到相对应的事情,假如self._ready含有数据信息,就不一一等了,必须立刻至下三个步骤,便于能赶快分配生产调度。在获得系统软件事件循环所得到的事件之后,就进入了接着,该流程可以通过self._process_events方法解决相对应的事情,然后把事情相对应的调整储存到self._ready中,接着再赋值self._ready中所有Handle并逐个实行(实行的时候可以觉得EventLoop把管控权回到给相对应的启用逻辑性),到此一套完整的生产调度逻辑性就没有了,并进到下个生产调度逻辑性。


  3.互联网IO事件解决


  注:导致系统事件循环限制,因此材料IO通常依然采用多核来完成,实际见:github.com/python/asyn…


  在研究EventLoop生产调度完成时忽视了self._process_events的实际完成逻辑性,由于_process_events方法所属asyncio.base_event.py文件里的BaseEventLoop类并没有有实际达到的,由于互联网IO有关的需求全面的事件循环过来帮忙解决,因此与系统软件事件循环有关的思路都会asyncio.selector_events.py里的BaseSelectorEventLoop类中。BaseSelectorEventLoop类封装形式了selector模块与系统软件事件循环互动,使调用者不用去考虑到sock的建立及其sock形成的文件描述符的监视与销户等行为,下边以BaseSelectorEventLoop中内置的pipe为事例,剖析BaseSelectorEventLoop是如何开始互联网IO事故处理的。


  在分析之前,先看一个例子,代码如下:


  import asyncio
  import threading
  def task():
  print("task")
  def run_loop_inside_thread(loop):
  loop.run_forever()
  loop=asyncio.get_event_loop()
  threading.Thread(target=run_loop_inside_thread,args=(loop,)).start()
  loop.call_soon(task)
  如果直接运行这个例子,它并不会输出task(不过在IDE使用DEBUG模式下线程启动会慢一点,所以会输出的),因为在调用loop.run_forever后EventLoop会一直卡在这段逻辑中:
  event_list=self._selector.select(timeout)
  所以调用loop.call_soon并不会使EventLoop马上安排调度,而如果把call_soon换成call_soon_threadsafe则可以正常输出,这是因为call_soon_threadsafe中多了一个self._write_to_self的调用,它的源码如下:
  class BaseEventLoop:
  ...
  def call_soon_threadsafe(self,callback,*args,context=None):
  """Like call_soon(),but thread-safe."""
  self._check_closed()
  handle=self._call_soon(callback,args,context)
  self._write_to_self()
  return handle


  由于这个调用是涉及到IO相关的,所以需要到BaseSelectorEventLoop类查看,接下来以pipe相关的网络IO操作来分析EventLoop是如何处理IO事件的(只演示reader对象,writer对象操作与reader类似),


  对应的源码如下:


  class BaseSelectorEventLoop(base_events.BaseEventLoop):
  #######
  #创建#
  #######
  def __init__(self,selector=None):
  super().__init__()
  if selector is None:
  #获取最优的selector
  selector=selectors.DefaultSelector()
  self._selector=selector
  #创建pipe
  self._make_self_pipe()
  self._transports=weakref.WeakValueDictionary()
  def _make_self_pipe(self):
  #创建Pipe对应的sock
  self._ssock,self._csock=socket.socketpair()
  #设置sock为非阻塞
  self._ssock.setblocking(False)
  self._csock.setblocking(False)
  self._internal_fds+=1
  #阻塞服务端sock读事件对应的回调
  self._add_reader(self._ssock.fileno(),self._read_from_self)
  def _add_reader(self,fd,callback,*args):
  #检查事件循环是否关闭
  self._check_closed()
  #封装回调为handle对象
  handle=events.Handle(callback,args,self,None)
  try:
  key=self._selector.get_key(fd)
  except KeyError:
  #如果没有注册到系统的事件循环,则注册
  self._selector.register(fd,selectors.EVENT_READ,
  (handle,None))
  else:
  #如果已经注册过,则更新
  mask,(reader,writer)=key.events,key.data
  self._selector.modify(fd,mask|selectors.EVENT_READ,
  (handle,writer))
  if reader is not None:
  reader.cancel()
  return handle
  def _read_from_self(self):
  #负责消费sock数据
  while True:
  try:
  data=self._ssock.recv(4096)
  if not data:
  break
  self._process_self_data(data)
  except InterruptedError:
  continue
  except BlockingIOError:
  break
  #######
  #删除#
  #######
  def _close_self_pipe(self):
  #注销Pipe对应的描述符
  self._remove_reader(self._ssock.fileno())
  #关闭sock
  self._ssock.close()
  self._ssock=None
  self._csock.close()
  self._csock=None
  self._internal_fds-=1
  def _remove_reader(self,fd):
  #如果事件循环已经关闭了,就不用操作了
  if self.is_closed():
  return False
  try:
  #查询文件描述符是否在selector中
  key=self._selector.get_key(fd)
  except KeyError:
  #不存在则返回
  return False
  else:
  #存在则进入移除的工作
  mask,(reader,writer)=key.events,key.data
  #通过事件掩码判断是否有其它事件
  mask&=~selectors.EVENT_READ
  if not mask:
  #移除已经注册到selector的文件描述符
  self._selector.unregister(fd)
  else:
  #移除已经注册到selector的文件描述符,并注册新的事件
  self._selector.modify(fd,mask,(None,writer))
  #如果reader不为空,则取消reader
  if reader is not None:
  reader.cancel()
  return True
  else:
  return False
  通过源码中的创建部分可以看到,EventLoop在启动的时候会创建一对建立通信的sock,并设置为非阻塞,然后把对应的回调封装成一个Handle对象并注册到系统事件循环中(删除则进行对应的反向操作),之后系统事件循环就会一直监听对应的事件,也就是EventLoop的执行逻辑会阻塞在下面的调用中,等待事件响应:
  event_list=self._selector.select(timeout)这时如果执行loop.call_soon_threadsafe,那么会通过write_to_self写入一点信息:
  def _write_to_self(self):
  csock=self._csock
  if csock is None:
  return
  try:
  csock.send(b'')
  except OSError:
  if self._debug:
  logger.debug("Fail to write a null byte into the self-pipe socket",exc_info=True)

  由于csock被写入了数据,那么它对应的ssock就会收到一个读事件,系统事件循环在收到这个事件通知后就会把数据返回,然后EventLoop就会获得到对应的数据,并交给process_events方法进行处理,


  它的相关代码如下:


  class BaseSelectorEventLoop:
  def _process_events(self,event_list):
  for key,mask in event_list:
  #从回调事件中获取到对应的数据,key.data在注册时是一个元祖,所以这里要对元祖进行解包
  fileobj,(reader,writer)=key.fileobj,key.data
  if mask&selectors.EVENT_READ and reader is not None:
  #得到reader handle,如果是被标记为取消,就移除对应的文件描述符
  if reader._cancelled:
  self._remove_reader(fileobj)
  else:
  #如果没被标记为取消,则安排到self._ready中
  self._add_callback(reader)
  if mask&selectors.EVENT_WRITE and writer is not None:
  #对于写对象,也是同样的道理。
  if writer._cancelled:
  self._remove_writer(fileobj)
  else:
  self._add_callback(writer)
  def _add_callback(self,handle):
  #把回调的handle添加到_ready中
  assert isinstance(handle,events.Handle),'A Handle is required here'
  if handle._cancelled:
  return
  assert not isinstance(handle,events.TimerHandle)
  self._ready.append(handle)
  def _remove_reader(self,fd):
  #如果事件循环已经关闭了,就不用操作了
  if self.is_closed():
  return False
  try:
  #查询文件描述符是否在selector中
  key=self._selector.get_key(fd)
  except KeyError:
  #不存在则返回
  return False
  else:
  #存在则进入移除的工作
  mask,(reader,writer)=key.events,key.data
  mask&=~selectors.EVENT_READ
  if not mask:
  #移除已经注册到selector的文件描述符
  self._selector.unregister(fd)
  else:
  self._selector.modify(fd,mask,(None,writer))
  if reader is not None:
  reader.cancel()
  return True
  else:
  return False


  从编码中可以看到_process_events会让事情相对应的文件描述符予以处理,并且从事情调整中掌握到相对应的Handle对象添加到self._ready中,由EventLoop在下面赋值self._ready并实施。


  能够看见互联网IO事件解决不复杂,由于系统软件事件循环早已给我们做了许多生活了,可是客户全部与互联网IO有关实际操作都必须有1个相似的实际操作,那样是十分繁琐复杂,幸好asyncio库已为我们做了封装形式,我们只需要启用就行了,便捷许多。


  综上所述,这篇文章就给大家介绍到这里了,希望可以为大家带来帮助。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129053.html

相关文章

  • PythonAsyncio中Coroutines,Tasks,Future可等候目标关联及功效

      此篇文章关键阐述了PythonAsyncio中Coroutines,Tasks,Future可等候目标关联及功效,文章内容紧扣主题进行详尽的基本介绍,必须的朋友可以学习一下  前记  上一篇阅读理解《Python中Async语法协同程序的完成》阐述了Python是如何用制作器来达到协同程序的及其PythonAsyncio根据Future和Task的封装形式来达到协同程序的生产调度,但在Pyth...

    89542767 评论0 收藏0
  • 从RocketMQ我们学到了什么之NameServer

    摘要:故事中的下属们,就是消息生产者角色,屋子右面墙根那块地就是消息持久化,吕秀才就是消息调度中心,而你就是消息消费者角色。下属们汇报的消息,应该叠放在哪里,这个消息又应该在哪里才能找到,全靠吕秀才的惊人记忆力,才可以让消息准确的被投放以及消费。 微信公众号:IT一刻钟大型现实非严肃主义现场一刻钟与你分享优质技术架构与见闻,做一个有剧情的程序员关注可了解更多精彩内容。问题或建议,请公众号留言...

    wangbjun 评论0 收藏0
  • 从RocketMQ我们学到了什么之NameServer

    摘要:故事中的下属们,就是消息生产者角色,屋子右面墙根那块地就是消息持久化,吕秀才就是消息调度中心,而你就是消息消费者角色。下属们汇报的消息,应该叠放在哪里,这个消息又应该在哪里才能找到,全靠吕秀才的惊人记忆力,才可以让消息准确的被投放以及消费。 微信公众号:IT一刻钟大型现实非严肃主义现场一刻钟与你分享优质技术架构与见闻,做一个有剧情的程序员关注可了解更多精彩内容。问题或建议,请公众号留言...

    Arno 评论0 收藏0
  • LVS负载均衡集群架构

    摘要:本文已获得原作者霸都民工哥授权。简单介绍虚拟服务器,是一个虚拟的服务器集群系统,可以在和系统中运行,年开发研究的项目。官方网站发展史在内核时,就已经以内核补丁的形式出现从。 本文已获得原作者霸都民工哥授权。 写在前面 为什么需要使用负载均衡呢?这是一个必较重要的问题 实际生产环境中某单台服务器已不能负载日常用访问压力时,就需要使用负载均衡,把用户的请求数据分担到(尽可能平均分配)后端所...

    yexiaobai 评论0 收藏0
  • LVS负载均衡集群架构

    摘要:本文已获得原作者霸都民工哥授权。简单介绍虚拟服务器,是一个虚拟的服务器集群系统,可以在和系统中运行,年开发研究的项目。官方网站发展史在内核时,就已经以内核补丁的形式出现从。 本文已获得原作者霸都民工哥授权。 写在前面 为什么需要使用负载均衡呢?这是一个必较重要的问题 实际生产环境中某单台服务器已不能负载日常用访问压力时,就需要使用负载均衡,把用户的请求数据分担到(尽可能平均分配)后端所...

    Scorpion 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<