资讯专栏INFORMATION COLUMN

Python 学习笔记 并发 future

lewif / 3239人阅读

摘要:和类是高级类,大部分情况下只要学会使用即可,无需关注其实现细节。类与类十分相似,只不过一个是处理进程,一个是处理线程,可根据实际需要选择。示例运行结果不同机器运行结果可能不同。

concurrent.futures模块

该模块主要特色在于ThreadPoolExecutor 和 ProcessPoolExecutor 类,这两个类都继承自concurrent.futures._base.Executor类,它们实现的接口能分别在不同的线程或进程中执行可调用的对象,它们都在内部维护着一个工作线程或者进程池。

ThreadPoolExecutor 和 ProcessPoolExecutor 类是高级类,大部分情况下只要学会使用即可,无需关注其实现细节。

####ProcessPoolExecutor 类

>class ThreadPoolExecutor(concurrent.futures._base.Executor)

>|  This is an abstract base class for concrete asynchronous executors.

>|  Method resolution order:

>|      ThreadPoolExecutor

 |      concurrent.futures._base.Executor

 |      builtins.object

 |

 |  Methods defined here:

 |

 |  init(self, max_workers=None, thread_name_prefix="")

 |      Initializes a new ThreadPoolExecutor instance.

 |

 |      Args:

 |          max_workers: The maximum number of threads that can be used to

 |              execute the given calls.

 |          thread_name_prefix: An optional name prefix to give our threads.

 |

 |  shutdown(self, wait=True)

 |      Clean-up the resources associated with the Executor.

 |

 |      It is safe to call this method several times. Otherwise, no other

 |      methods can be called after this one.

 |

 |      Args:

 |          wait: If True then shutdown will not return until all running

 |              futures have finished executing and the resources used by the

 |              executor have been reclaimed.

 |

 |  submit(self, fn, *args, **kwargs)

 |      Submits a callable to be executed with the given arguments.

 |

 |      Schedules the callable to be executed as fn(*args, **kwargs) and returns

 |      a Future instance representing the execution of the callable.

 |

 |      Returns:

 |          A Future representing the given call.

 |

 |  ----------------------------------------------------------------------

 |  Methods inherited from concurrent.futures._base.Executor:

 |

 |  enter(self)

 |

 |  exit(self, exc_type, exc_val, exc_tb)

 |

 |  map(self, fn, *iterables, timeout=None, chunksize=1)

 |      Returns an iterator equivalent to map(fn, iter).

 |

 |      Args:

 |          fn: A callable that will take as many arguments as there are

 |              passed iterables.

 |          timeout: The maximum number of seconds to wait. If None, then there

 |              is no limit on the wait time.

 |          chunksize: The size of the chunks the iterable will be broken into

 |              before being passed to a child process. This argument is only

 |              used by ProcessPoolExecutor; it is ignored by

 |              ThreadPoolExecutor.

 |

 |      Returns:

 |          An iterator equivalent to: map(func, *iterables) but the calls may

 |          be evaluated out-of-order.

 |

 |      Raises:

 |          TimeoutError: If the entire result iterator could not be generated

 |              before the given timeout.

 |          Exception: If fn(*args) raises for any values.



初始化可以指定一个最大进程数作为其参数 max_workers 的值,该值一般无需指定,默认为当前运行机器的核心数,可以由os.cpu_count()获取;类中含有方法:

map()方法,与python内置方法map() 功能类似,也就是映射,参数为:

一个可调用函数 fn

一个迭代器 iterables

超时时长 timeout

块数chuncksize 如果大于1, 迭代器会被分块处理

---->> 该函数有一个特性:其返回结果与调用开始的顺序是一致的;在调用过程中不会产生阻塞,也就是说可能前者被调用执行结束之前,后者被调用已经执行结束了。

如果一定要获取到所有结果后再处理,可以选择采用submit()方法和futures.as_completed函数结合使用。

shutdown()方法,清理所有与当前执行器(executor)相关的资源

submit() 方法,提交一个可调用对象给fn使用

从concurrent.futures._base.Executor继承了__enter__() 和 __exit__()方法,这意味着ProcessPoolExecutor 对象可以用于with 语句。

from concurrent import futures
with futures.ProcessPoolExecutor(max_works=3) as executor:
     executor.map()

ThreadPoolExecutor类
class ThreadPoolExecutor(concurrent.futures._base.Executor)

 |  This is an abstract base class for concrete asynchronous executors.

 |

 |  Method resolution order:

 |      ThreadPoolExecutor

 |      concurrent.futures._base.Executor

 |      builtins.object

 |

 |  Methods defined here:

 |

 |  init(self, max_workers=None, thread_name_prefix="")

 |      Initializes a new ThreadPoolExecutor instance.

 |

 |      Args:

 |          max_workers: The maximum number of threads that can be used to

 |              execute the given calls.

 |          thread_name_prefix: An optional name prefix to give our threads.

 |

 |  shutdown(self, wait=True)

 |      Clean-up the resources associated with the Executor.

 |

 |      It is safe to call this method several times. Otherwise, no other

 |      methods can be called after this one.

 |

 |      Args:

 |          wait: If True then shutdown will not return until all running

 |              futures have finished executing and the resources used by the

 |              executor have been reclaimed.

 |

 |  submit(self, fn, *args, **kwargs)

 |      Submits a callable to be executed with the given arguments.

 |

 |      Schedules the callable to be executed as fn(*args, **kwargs) and returns

 |      a Future instance representing the execution of the callable.

 |

 |      Returns:

 |          A Future representing the given call.

 |

 |  ----------------------------------------------------------------------

 |  Methods inherited from concurrent.futures._base.Executor:

 |

 |  enter(self)

 |

 |  exit(self, exc_type, exc_val, exc_tb)

 |

 |  map(self, fn, *iterables, timeout=None, chunksize=1)

 |      Returns an iterator equivalent to map(fn, iter).

 |

 |      Args:

 |          fn: A callable that will take as many arguments as there are

 |              passed iterables.

 |          timeout: The maximum number of seconds to wait. If None, then there

 |              is no limit on the wait time.

 |          chunksize: The size of the chunks the iterable will be broken into

 |              before being passed to a child process. This argument is only

 |              used by ProcessPoolExecutor; it is ignored by

 |              ThreadPoolExecutor.

 |

 |      Returns:

 |          An iterator equivalent to: map(func, *iterables) but the calls may

 |          be evaluated out-of-order.

 |

 |      Raises:

 |          TimeoutError: If the entire result iterator could not be generated

 |              before the given timeout.

 |          Exception: If fn(*args) raises for any values.

与ProcessPoolExecutor 类十分相似,只不过一个是处理进程,一个是处理线程,可根据实际需要选择。

示例
from time import sleep, strftime
from concurrent import futures


def display(*args):
    print(strftime("[%H:%M:%S]"), end="")
    print(*args)


def loiter(n):
    msg = "{}loiter({}): doing nothing for {}s"
    display(msg.format("	"*n, n, n))
    sleep(n)
    msg = "{}loiter({}): done."
    display(msg.format("	"*n, n))
    return n*10


def main():
    display("Script starting")
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display("results:", results)
    display("Waiting for individual results:")
    for i, result in enumerate(results):
        display("result {} : {}".format(i, result))


if __name__ == "__main__":
    main()

运行结果:

[20:32:12]Script starting
[20:32:12]loiter(0): doing nothing for 0s
[20:32:12]loiter(0): done.
[20:32:12]      loiter(1): doing nothing for 1s
[20:32:12]              loiter(2): doing nothing for 2s
[20:32:12]results: .result_iterator at 0x00000246DB21BC50>
[20:32:12]Waiting for individual results:
[20:32:12]                      loiter(3): doing nothing for 3s
[20:32:12]result 0 : 0
[20:32:13]      loiter(1): done.
[20:32:13]                              loiter(4): doing nothing for 4s
[20:32:13]result 1 : 10
[20:32:14]              loiter(2): done.
[20:32:14]result 2 : 20
[20:32:15]                      loiter(3): done.
[20:32:15]result 3 : 30
[20:32:17]                              loiter(4): done.
[20:32:17]result 4 : 40

不同机器运行结果可能不同。

示例中设置max_workers=3,所以代码一开始运行,则有三个对象(0,1,2)被执行loiter() 操作; 三秒后,对象0运行结束,得到结果result 0之后,对象3才开始被执行,同理,对象4的执行时间在对象1执行结果result 1打印结束之后。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42326.html

相关文章

  • python并发 1:使用 futures 处理并发

    摘要:标准库中所有阻塞型函数都会释放,允许其他线程运行。如果调用引发异常,那么当从迭代器检索其值时,将引发异常。总结自版就支持线程了,只不过是使用线程的最新方式。类封装了模块的组件,使使用线程变得更加方便。下一篇笔记应该是使用处理并发。 作为Python程序员,平时很少使用并发编程,偶尔使用也只需要派生出一批独立的线程,然后放到队列中,批量执行。所以,不夸张的说,虽然我知道线程、进程、并行、...

    Kyxy 评论0 收藏0
  • Python

    摘要:最近看前端都展开了几场而我大知乎最热语言还没有相关。有关书籍的介绍,大部分截取自是官方介绍。但从开始,标准库为我们提供了模块,它提供了和两个类,实现了对和的进一步抽象,对编写线程池进程池提供了直接的支持。 《流畅的python》阅读笔记 《流畅的python》是一本适合python进阶的书, 里面介绍的基本都是高级的python用法. 对于初学python的人来说, 基础大概也就够用了...

    dailybird 评论0 收藏0
  • Python基础之使用期物处理并发

    摘要:本文重点掌握异步编程的相关概念了解期物的概念意义和使用方法了解中的阻塞型函数释放的特点。一异步编程相关概念阻塞程序未得到所需计算资源时被挂起的状态。 导语:本文章记录了本人在学习Python基础之控制流程篇的重点知识及个人心得,打算入门Python的朋友们可以来一起学习并交流。 本文重点: 1、掌握异步编程的相关概念;2、了解期物future的概念、意义和使用方法;3、了解Python...

    asoren 评论0 收藏0
  • python进阶笔记【2】 --- 一个奇怪的 __future__ 库

    摘要:正文总所周知,和根本就是两个东西,每次因为这个兼容性的问题都会把自己搞疯。提供了模块,把下一个新版本的特性导入到当前版本,于是我们就可以在当前版本中测试一些新版本的特性。传送门不多,才个。 写在前面 我是在学习cs231n的assignment3的课程,发现里面的代码大量频繁出现了这个库,那我就很奇怪了,为什么有个future这个奇怪名字的库会出现呢?到底这个库又有什么用?下面就让我为...

    Achilles 评论0 收藏0
  • 这篇博客和你唠唠 python 并发,滚雪球学python第四季,第16篇

    摘要:图片下载属于操作,比较耗时,基于此,可以利用中的多线程将其实现。更多精彩滚雪球学完结滚雪球学第二轮完结滚雪球学第三轮滚雪球学番外篇完结 在 python 编码过程中...

    qpwoeiru96 评论0 收藏0

发表评论

0条评论

lewif

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<