问题 使用在python中调用外部命令来控制子进程的数量


我明白使用  是调用外部命令的首选方法。

但是如果我想在parall中运行几个命令,但是限制生成的进程数呢?困扰我的是我无法阻止子流程。例如,如果我打电话

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile)

然后该过程将继续,无需等待 cmd 完成。因此,我不能把它包装在一个工人中 multiprocessing 图书馆。

例如,如果我这样做:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);

pool = Pool( processes = 10 );
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list];
ans = [res.get() for res in results];

然后每个工人将在产生子流程后完成并返回。所以我无法真正限制生成的进程数 subprocess 通过使用 Pool

什么是限制子过程数量的正确方法?


10461
2018-03-21 16:22


起源



答案:


您可以使用 subprocess.call 如果你想等待命令完成。看到 pydoc subprocess 了解更多信息。

你也可以打电话给 Popen.wait 你工人的方法:

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
    p.wait()

6
2018-03-21 16:25



这会完全禁用并行处理 - qed
它不应该。问题是使用 multiprocessing 模块,每个工人都是在一个单独的过程中产生的,所以 wait()一个工人不会阻止其他工人跑步。也就是说,这本身并不正确 - 这个例子没有 return 来自工人的任何事情,所以打电话 .get() 结果不会返回任何内容。 - larsks


答案:


您可以使用 subprocess.call 如果你想等待命令完成。看到 pydoc subprocess 了解更多信息。

你也可以打电话给 Popen.wait 你工人的方法:

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
    p.wait()

6
2018-03-21 16:25



这会完全禁用并行处理 - qed
它不应该。问题是使用 multiprocessing 模块,每个工人都是在一个单独的过程中产生的,所以 wait()一个工人不会阻止其他工人跑步。也就是说,这本身并不正确 - 这个例子没有 return 来自工人的任何事情,所以打电话 .get() 结果不会返回任何内容。 - larsks


您不需要多个Python进程甚至线程来限制并行子进程的最大数量:

from itertools import izip_longest
from subprocess import Popen, STDOUT

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
          for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
    for p in filter(None, processes):
        p.wait()

看到 在Python中用块(n)迭代迭代器?

如果您想限制并行子进程的最大和最小数量,可以使用线程池:

from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call

def run(cmd):
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
    if rc != 0:
        print('{cmd} failed with exit status: {rc}'.format(**vars()))

任何一个 limit 子进程结束,新的子进程开始维护 limit 始终有多个子流程。

或使用 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call

with ThreadPoolExecutor(max_workers=limit) as executor:
    for cmd in commands:
        executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)

这是一个简单的线程池实现:

import subprocess
from threading import Thread

try: from queue import Queue
except ImportError:
    from Queue import Queue # Python 2.x


def worker(queue):
    for cmd in iter(queue.get, None):
        subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)

q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
    t.daemon = True
    t.start()

for cmd in commands:  # feed commands to threads
    q.put_nowait(cmd)

for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join()    # wait for completion

为避免过早退出,请添加异常处理。

如果要捕获字符串中的子进程输出,请参阅 Python:并行执行cat子进程


10
2018-03-21 17:13