问题 如何监控python的concurrent.futures.ProcessPoolExecutor?


我们正在使用 ProcessPoolExecutor 从 concurrent.futures 在异步接收请求的服务中,并在进程池中执行实际的同步处理。

一旦我们遇到流程池耗尽的情况,那么新请求必须等到其他一些流程完成。

有没有办法询问进程池的当前用法?这将使我们能够监控他们的状态并进行适当的容量规划。

如果没有,是否有任何良好的替代流程池实现与支持此类监控/容量规划的异步接口?


5829
2018-02-01 22:15


起源

只需查看工作队列的长度即可 ProcessPoolExecutor._pending_work_items。如果它大于零,则表示工作项等待。 - fpbhb
@fpbhb这是一个私有属性,这是不使用它的好理由,它也是一个二进制信号,因此对预防措施不利。所以,谢谢,但我希望有更好的东西。 - moritz
这是Python,不是吗?但除此之外:你想要实现什么?动态调整工人数量 决不 有工作在等?这既不支持 concurrent.futures 也不是 multiprocessing.pool。这也是毫无意义的,因为一旦你的硬件资源耗尽了一些东西 具有 等待。 - fpbhb
例如,@ fpbhb添加更多(虚拟)硬件 - moritz
为此,您需要一个遍布多台计算机的进程池。使用您现在使用的流程池无法完成。您需要一个网络机制来分配负载,例如AMQP或类似的。 - fpbhb


答案:


最简单的方法是扩展 ProcessPoolExecutor 有理想的行为。以下示例维护stdlib接口,但不访问实现细节:

from concurrent.futures import ProcessPoolExecutor


class MyProcessPoolExecutor(ProcessPoolExecutor):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._running_workers = 0

    def submit(self, *args, **kwargs):
        future = super().submit(*args, **kwargs)
        self._running_workers += 1
        future.add_done_callback(self._worker_is_done)
        return future

    def _worker_is_done(self, future):
        self._running_workers -= 1

    def get_pool_usage(self):
        return self._running_workers

10
2018-02-05 17:05



我必须修复参数列表 _worker_is_done (已在上面的文本中修复),它接收未来作为参数,因此除此之外还需要一个参数 self。现在它有效,谢谢! - moritz


我最近以稍微不同的方式为自己解决了这个问题。简化,这是我做的:

  • 我在我的主循环范围内定义的集合中跟踪外部未决期货。
  • 我附加一个回调到每个未来,这个回调是对期货组合的关闭,允许它在完成后从集合中移除未来。

所以,鉴于此 done() 是个 实际 回调函数,在别处定义,以下是在我的主循环范围内定义的:

bag = set()

def make_callback(b):

    def callback(f):
        nonlocal b
        b.remove(f)
        done(f)

    return callback

为每个未来 f 我提交给ProcessPoolExecutor,我添加回调:

f.add_done_callback(make_callback(bag))

在任何时候,都可以通过查看的内容来查看待处理和正在运行的期货列表 bag,可选择根据未来的结果进行过滤 running() 方法。例如。:

print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))

对于许多简单的用例,模块级的set变量可能与闭包一样有效。


2
2018-06-06 10:57