资讯专栏INFORMATION COLUMN

Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案

lmxdawn / 3326人阅读

摘要:解决方法有两种。代码然而这段代码只有在运行在处的时候才能用中断,即前你按有效,一旦后则完全无效建议先确认是否真的需要用到多进程,如果是多的程序建议用多线程或协程,计算特别多则用多进程。

本文理论上对multiprocessing.dummy的Pool同样有效。

python2.x中multiprocessing提供的基于函数进程池,join后陷入内核态,按下ctrl+c不能停止所有的进程并退出。即必须ctrl+z后找到残留的子进程,把它们干掉。先看一段ctrl+c无效的代码:

#!/usr/bin/env python
import multiprocessing
import os
import time


def do_work(x):
    print "Work Started: %s" % os.getpid()
    time.sleep(10)
    return x * x


def main():
    pool = multiprocessing.Pool(4)
    try:
        result = pool.map_async(do_work, range(8))
        pool.close()
        pool.join()
        print result
    except KeyboardInterrupt:
        print "parent received control-c"
        pool.terminate()
        pool.join()
 

if __name__ == "__main__":
    main()

这段代码运行后,按^c一个进程也杀不掉,最后会残留包括主进程在内共5个进程(1+4),kill掉主进程能让其全部退出。很明显,使用进程池时KeyboardInterrupt不能被进程捕捉。解决方法有两种。

方案一

下面这段是python源码里multiprocessing下的pool.py中的一段,ApplyResult就是Pool用来保存函数运行结果的类

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self

而下面这段代码也是^c无效的代码

if __name__ == "__main__":
    import threading

    cond = threading.Condition(threading.Lock())
    cond.acquire()
    cond.wait()
    print "done"

很明显,threading.Condition(threading.Lock())对象无法接收KeyboardInterrupt,但稍微修改一下,给cond.wait()一个timeout参数即可,这个timeout可以在map_async后用get传递,把

result = pool.map_async(do_work, range(4))

改为

result = pool.map_async(do_work, range(4)).get(1)

就能成功接收^c了,get里面填1填99999还是0xffff都行

方案二

另一种方法当然就是自己写进程池了,需要使用队列,贴一段代码感受下

#!/usr/bin/env python
import multiprocessing, os, signal, time, Queue

def do_work():
    print "Work Started: %d" % os.getpid()
    time.sleep(2)
    return "Success"

def manual_function(job_queue, result_queue):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    while not job_queue.empty():
        try:
            job = job_queue.get(block=False)
            result_queue.put(do_work())
        except Queue.Empty:
            pass
        #except KeyboardInterrupt: pass

def main():
    job_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    for i in range(6):
        job_queue.put(None)

    workers = []
    for i in range(3):
        tmp = multiprocessing.Process(target=manual_function,
                                      args=(job_queue, result_queue))
        tmp.start()
        workers.append(tmp)

    try:
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print "parent received ctrl-c"
        for worker in workers:
            worker.terminate()
            worker.join()

    while not result_queue.empty():
        print result_queue.get(block=False)

if __name__ == "__main__":
    main()
方案三

使用一个全局变量eflag作标识,让SIG_INT信号绑定一个处理函数,在其中对eflag的值更改,线程的函数中以eflag的值判定作为while的条件,把语句写在循环里,老实说这个方案虽然可以用,但是简直太差劲。线程肯定是可行的,进程应该还需要多带带共享变量,非常不推荐的方式

常见的错误方案

这个必须要提一下,我发现segmentfault上都有人被误导了

理论上,在Pool初始化时传递一个initializer函数,让子进程忽略SIGINT信号,也就是^c,然后Pool进行terminate处理。代码

#!/usr/bin/env python
import multiprocessing
import os
import signal
import time


def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def run_worker(x):
    print "child: %s" % os.getpid()
    time.sleep(20)
    return x * x


def main():
    pool = multiprocessing.Pool(4, init_worker)
    try:
        results = []
        print "Starting jobs"
        for x in range(8):
            results.append(pool.apply_async(run_worker, args=(x,)))

        time.sleep(5)
        pool.close()
        pool.join()
        print [x.get() for x in results]
    except KeyboardInterrupt:
        print "Caught KeyboardInterrupt, terminating workers"
        pool.terminate()
        pool.join()


if __name__ == "__main__":
    main()

然而这段代码只有在运行在time.sleep(5)处的时候才能用ctrl+c中断,即前5s你按^c有效,一旦pool.join()后则完全无效!

建议

先确认是否真的需要用到多进程,如果是IO多的程序建议用多线程或协程,计算特别多则用多进程。如果非要用多进程,可以利用Python3的concurrent.futures包(python2.x也能装),编写更加简单易用的多线程/多进程代码,其使用和Java的concurrent框架有些相似.
经过亲自验证,ProcessPoolExecutor是没有^c的问题的,要用多进程建议使用它

参考

http://bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges

http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool#comment12678760_6191991

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/37677.html

相关文章

  • python进阶笔记【1】--- 多进程

    摘要:很简单,这个模块实现了开辟一块共享内存空间,就好比中的方法一样,有兴趣的同学可以去查阅。查了下资料,返回的对象控制了一个进程,可用于多进程之间的安全通信,其支持的类型有和等。 有关于 multiprocessing 中共享变量的问题 现在的cpu都很强大,比方我用的至强2620有24核可以同时工作,并行执行进程程序。这在计算密集型的程序是很需要的,如沙漠中的绿洲,令人重获新生。那么,问...

    Wildcard 评论0 收藏0
  • Python 面试」第三次更新

    摘要:说一下进程线程以及多任务多进程多线程和协程进程概念一个程序对应一个进程,这个进程被叫做主进程,而一个主进程下面还有许多子进程。避免了由于系统在处理多进程或者多线程时,切换任务时需要的等待时间。 showImg(https://segmentfault.com/img/bVbuYxg?w=3484&h=2480); 阅读本文大约需要 10 分钟。 14.说一下进程、线程、以及多任务(多进...

    wslongchen 评论0 收藏0
  • python综合学习二之多进程

    摘要:本节讲学习的多进程。和之前的的不同点是丢向的函数有返回值,而的没有返回值。所以接下来让我们来看下这两个进程是否会出现冲突。 本节讲学习Python的多进程。 一、多进程和多线程比较 多进程 Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multip...

    gityuan 评论0 收藏0
  • Python_系统编程

    摘要:主进程会等待所有的子进程先结束,然后再结束主进程。关闭进程池,关闭后实例不再接收新的请求等待实例中的所有子进程执行完毕,主进程才会退出,必须放在语句之后。主进程一般都用来等待,任务在子进程中执行。 多任务:同一个时间段中,执行多个函数/运行多个程序. 操作系统可以同时运行多个任务:操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,...

    wuaiqiu 评论0 收藏0
  • python简单好用进程间数据通讯模块multiprocessing.Manager

    摘要:目前开发中有遇到进程间需要共享数据的情况所以研究了下主要会以为例子说明下进程间共享同一个父进程使用说明创建一个对象创建一个创建一个测试程序创建进程池进行测试简单的源码分析这时我们再看一个例子创建一个对象创建一个创建一个测试程序创建进程池进行 目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共...

    jeyhan 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<