我有一些代码需要针对可能挂起或有不受我控制的问题的其他几个系统运行。我想使用python的多处理来生成子进程独立于主程序运行,然后当它们挂起或有问题终止它们时,但我不确定最好的方法来解决这个问题。
当调用terminate时,它确实会终止子进程,但它会变成一个已经失效的僵尸,直到进程对象消失才会被释放。循环永远不会结束的下面的示例代码可以杀死它,并在再次调用时允许重新生成,但似乎不是一个很好的方法(即multiprocessing.Process()在__init __()中会更好)。
有人有建议吗?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
您可以在后台运行子进程作为守护进程。
process.daemon = True
守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,只有在主进程退出后才会终止它。
这将适用于简单的问题,直到您遇到许多子守护进程,这些进程将在没有任何明确控制的情况下继续从父进程中获取内存。
最好的方法是建立一个 Queue
让所有子进程与父进程通信,以便我们可以 join
他们和清理干净。这是一些简单的代码,用于检查子处理是否挂起(也就是说 time.sleep(1000)
),并向队列发送消息,主进程对其采取行动:
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print "working {0} ...".format(count)
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print "hanging..."
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print "[WATCHDOG]: Maybe WORKER is slacking"
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print "[MAIN]: starting process P1"
wdog.start()
# Poll the queue
while True:
msg = q.get()
if msg == "KILL WATCHDOG":
print "[MAIN]: Terminating slacking WORKER"
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print "[MAIN]: WORKER is a goner"
workr.join(timeout=1.0)
print "[MAIN]: Joined WORKER successfully!"
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
没有终止 worker
, 尝试 join()
它到主流程将永远阻止 worker
从未完成。
Python多处理处理进程的方式有点令人困惑。
从多处理指南:
加入僵尸进程
在Unix上,当一个进程完成但尚未加入时,它就变成了一个僵尸。应该永远不会有很多,因为每次启动一个新进程(或调用active_children())时,所有尚未加入的已完成进程都将被连接。同时调用已完成的进程的Process.is_alive将加入该进程。即便如此,明确加入您开始的所有流程也许是一种好习惯。
为了避免进程变成僵尸,你需要调用它 join()
方法一旦你杀了它。
如果您想要一种更简单的方法来处理系统中的挂起呼叫,您可以查看 卵石。
(没有足够的声誉点评论,特此完整答案)
@PieOhPah:谢谢你这个非常好的例子。
不幸的是,只有一个小缺陷不会让看门狗杀死工人:
if msg == "KILL WATCHDOG":
它应该是:
if msg == "KILL WORKER":
所以代码变为(对python3更新了打印):
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print ("working {0} ...".format(count))
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print ("hanging...")
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print ("[WATCHDOG]: Maybe WORKER is slacking")
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print ("[MAIN]: starting process P1")
wdog.start()
# Poll the queue
while True:
msg = q.get()
# if msg == "KILL WATCHDOG":
if msg == "KILL WORKER":
print ("[MAIN]: Terminating slacking WORKER")
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print ("[MAIN]: WORKER is a goner")
workr.join(timeout=1.0)
print ("[MAIN]: Joined WORKER successfully!")
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()