问题 如何使用python的多处理来终止进程


我有一些代码需要针对可能挂起或有不受我控制的问题的其他几个系统运行。我想使用python的多处理来生成子进程独立于主程序运行,然后当它们挂起或有问题终止它们时,但我不确定最好的方法来解决这个问题。

当调用terminate时,它确实会终止子进程,但它会变成一个已经失效的僵尸,直到进程对象消失才会被释放。循环永远不会结束的下面的示例代码可以杀死它,并在再次调用时允许重新生成,但似乎不是一个很好的方法(即multiprocessing.Process()在__init __()中会更好)。

有人有建议吗?

class Process(object):
    def __init__(self):
        self.thing = Thing()
        self.running_flag = multiprocessing.Value("i", 1)

    def run(self):
        self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
        self.process.start()
        print self.process.pid

    def pause_resume(self):
        self.running_flag.value = not self.running_flag.value

    def terminate(self):
        self.process.terminate()

class Thing(object):
    def __init__(self):
        self.count = 1

    def worker(self,running_flag):
        while True:
            if running_flag.value:
                self.do_work()

    def do_work(self):
        print "working {0} ...".format(self.count)
        self.count += 1
        time.sleep(1)

1949
2017-08-17 14:56


起源

我认为直接的答案是你不想终止你的过程 - 因为你发现它们变成了可以抓住记忆或知道什么的僵尸。由于我在多处理方面的经验有限,我更加警惕让工人们优雅地死去。是否 这个答案 在基本相同的主题上回答你的问题? - KobeJohn
不完全的。我已经阅读了许多类似的线程,他们期望发生异常然后你可以处理它。我有一些东西正在运行,但挂起。操作员可能会发现某些东西没有正确运行,我想要一种方法来停止这个过程。因为挂起它不会响应像通过multiprocessing.Value()发送停止标志之类的东西。可能,他们会做的是杀死整个程序,然后重新启动它,所以我想知道是否有一个很好的方法在程序中只为子进程恢复。 - Dan Littlejohn
我想我知道你的意思但仍然是我发现的唯一答案如上所述 - 你只需要做任何你能做的事情来避免“挂”(例如包装每个进程的一个包罗万象的例外)。作为进一步的证据,在许多情况下,线程可以说是更简单的解决方案 它有同样的建议。在这一点上,以防你没有检查 - 确保你真的需要多处理而不是线程,因为每个都有自己的优点和缺点。 - KobeJohn


答案:


您可以在后台运行子进程作为守护进程。

process.daemon = True

守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,只有在主进程退出后才会终止它。

这将适用于简单的问题,直到您遇到许多子守护进程,这些进程将在没有任何明确控制的情况下继续从父进程中获取内存。

最好的方法是建立一个 Queue 让所有子进程与父进程通信,以便我们可以 join 他们和清理干净。这是一些简单的代码,用于检查子处理是否挂起(也就是说 time.sleep(1000)),并向队列发送消息,主进程对其采取行动:

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print "working {0} ...".format(count)
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print "hanging..."
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print "[WATCHDOG]: Maybe WORKER is slacking"
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print "[MAIN]: starting process P1"
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
        if msg == "KILL WATCHDOG":
            print "[MAIN]: Terminating slacking WORKER"
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print "[MAIN]: WORKER is a goner"
                workr.join(timeout=1.0)
                print "[MAIN]: Joined WORKER successfully!"
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()

没有终止 worker, 尝试 join() 它到主流程将永远阻止 worker 从未完成。


6
2017-08-18 02:36





Python多处理处理进程的方式有点令人困惑。

从多处理指南:

加入僵尸进程

在Unix上,当一个进程完成但尚未加入时,它就变成了一个僵尸。应该永远不会有很多,因为每次启动一个新进程(或调用active_children())时,所有尚未加入的已完成进程都将被连接。同时调用已完成的进程的Process.is_alive将加入该进程。即便如此,明确加入您开始的所有流程也许是一种好习惯。

为了避免进程变成僵尸,你需要调用它 join() 方法一旦你杀了它。

如果您想要一种更简单的方法来处理系统中的挂起呼叫,您可以查看 卵石


6
2017-08-22 08:37



所以仍然需要打电话 process.terminate() 确保流程终止或 join() 只应该足以杀死他们? - weefwefwqg3
terminate 通过a向目标进程发出终止请求 SIGTERM 信号。这取决于满足请求的过程(在大多数情况下他们这样做)。如果该过程已经结束,则无效。 join 只是指示操作系统回收过程资源,否则它将在此之前阻塞。 join 必须始终在已结束的进程上调用,否则操作系统将堆积耗尽进程的资源。它们通常被称为“僵尸”进程,因为它们实际上是一次运行的空壳。 - noxdafox


(没有足够的声誉点评论,特此完整答案)

@PieOhPah:谢谢你这个非常好的例子。
不幸的是,只有一个小缺陷不会让看门狗杀死工人:

if msg == "KILL WATCHDOG":

它应该是:

if msg == "KILL WORKER":

所以代码变为(对python3更新了打印):

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print ("working {0} ...".format(count))
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print ("hanging...")
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print ("[WATCHDOG]: Maybe WORKER is slacking")
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print ("[MAIN]: starting process P1")
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
#        if msg == "KILL WATCHDOG":
        if msg == "KILL WORKER":
            print ("[MAIN]: Terminating slacking WORKER")
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print ("[MAIN]: WORKER is a goner")
                workr.join(timeout=1.0)
                print ("[MAIN]: Joined WORKER successfully!")
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()

1
2018-06-02 20:58