问题 多处理在进程之间共享不可序列化的对象


有三个问题可能重复(但过于具体):

通过回答这个问题,可以回答所有其他三个问题。 希望我能说清楚:

一旦我在多处理创建的某个进程中创建了一个对象:

  1. 我怎么通过 参考 那个对象到另一个进程?
  2. (不是那么重要)我如何确保在持有参考时这个过程不会消失?

例1(已解决)

from concurrent.futures import *

def f(v):
    return lambda: v * v

if __name__ == '__main__':
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
        l = list(e.map(f, [1,2,3,4]))
    print([g() for g in l]) # [1, 4, 9, 16]

例2

假设 f 返回一个具有可变状态的对象。应该可以从其他进程访问此相同的对象。

例3

我有一个具有打开文件和锁的对象 - 如何授予对其他进程的访问权限?

提醒

我不希望出现此特定错误。或者是这个特定用例的解决方案。解决方案应足够通用,以便在进程之间共享不可移动的对象。可以在任何进程中创建对象。使所有对象可移动并保持身份的解决方案也可以是好的。

任何提示都是受欢迎的,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的。所以我们可以一起创建解决方案。

这是一个 尝试 解决这个问题,但没有多处理: https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

您希望其他进程使用引用做什么?

引用可以传递给使用多处理创建的任何其他进程(重复3)。一个人可以访问属性,调用引用。访问的属性可能是也可能不是代理。

使用代理有什么问题?

也许没有问题,只有挑战。我的印象是代理有一个管理器,管理器有自己的进程,所以必须序列化和转移不可序列化的对象(部分用StacklessPython / fork解决)。 还存在特殊对象的代理 - 为所有对象(可解决的)构建代理很难但不是不可能的。

解? - 代理+经理?

Eric Urban表明序列化不是问题。真正的挑战在于Example2&3:状态的同步。我对解决方案的想法是为经理创建一个特殊的代理类。这个代理类

  1. 为不可序列化的对象采用构造函数
  2. 获取可序列化对象并将其传输到管理器进程。
  3. (问题)根据1.必须在经理过程中创建不可序列化的对象。

12463
2018-02-23 12:37


起源

应编辑该问题以解释您希望其他进程对引用执行的操作。只将它们传回原来的过程? - Armin Rigo
编辑了它。告诉我,如果这不能回答问题,谢谢。 - User
使用代理有什么问题? - Kritzefitz
我编辑了这个问题。谢谢你的回答,非常有见地。 - User
所以我想在上一篇文章中说的是,我没有看到任何一个例子,将对象转移到管理器而不是首先在那里创建它真的更好。 - Kritzefitz


答案:


大多数情况下,将现有对象的引用传递给另一个进程并不是很理想。而是创建要在进程之间共享的类:

class MySharedClass:
    # stuff...

然后你创建一个这样的代理管理器:

import multiprocessing.managers as m
class MyManager(m.BaseManager):
    pass # Pass is really enough. Nothing needs to be done here.

然后在该Manager上注册您的课程,如下所示:

MyManager.register("MySharedClass", MySharedClass)

然后,一旦经理被实例化并开始,那么 manager.start() 您可以使用创建类的共享实例 manager.MySharedClass。这应该适用于所有需求。返回的代理与原始对象的工作原理完全相同,除了在。中描述的一些例外情况 文件


9
2018-03-05 09:59



这很棒!我测试了它,效果很好。 codepad.org/zW2LU6XV 仍然存在并发问题。但这些都没关系。 - User
但这并没有解决问题。我已将此代码用作MySharedClass的模板,该模板具有(模拟)数据库游标。如果我尝试在MySharedClass方法中返回它,我会收到Unserializable Message错误。 - sinwav
@sinwav据我所知,在进程之间共享数据库游标是不可能的。无论您在进程之间使用何种传输机制,在某些时候对象都需要以某种方式进行序列化。 Python为此目的使用酸洗。如果你不能腌制某些东西就有理由。对于数据库游标,问题是,游标仅在创建它的连接上有效,但该连接仅在一个进程中打开。这得出结论,数据库游标仅在创建它的过程中有效。这里没有分享。 - Kritzefitz


在阅读本答案之前,请注意其中解释的解决方案很糟糕。请注意答案结尾处的警告。

我找到了一种通过共享对象状态的方法 multiprocessing.Array。 所以我创建了这个透明地通过所有进程共享状态的类:

import multiprocessing as m
import pickle

class Store:
    pass

class Shareable:
    def __init__(self, size = 2**10):
        object.__setattr__(self, 'store', m.Array('B', size))
        o = Store() # This object will hold all shared values
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

    def __getattr__(self, name):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        return getattr(o, name)

    def __setattr__(self, name, value):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        setattr(o, name, value)
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

def store(arr, s):
    for i, ch in enumerate(s):
        arr[i] = ch

def load(arr):
    l = arr[:]
    return bytes(arr)

您可以将此类的实例(及其子类)传递给任何其他进程,并通过所有进程同步它的状态。 这是用这段代码测试的:

class Foo(Shareable):
    def __init__(self):
        super().__init__()
        self.f = 1

    def foo(self):
        self.f += 1

def f(s):
    s.f += 1

if __name__ == '__main__':
    import multiprocessing as m
    import time
    s = Foo()
    print(s.f)
    p = m.Process(target=f, args=(s,))
    p.start()
    time.sleep(1)
    print(s.f)

这个类的“神奇之处”在于它将所有属性存储在该类的另一个实例中 Store。这堂课不是很特别。它只是一些可以拥有任意属性的类。 (dict也可以。)

然而,这堂课有一些非常讨厌的怪癖。我发现了两个。

第一个怪癖是你必须指定多少空间 Store 实例最多需要。这是因为 multiprocessing.Array 具有静态大小。因此可以在其中腌制的对象只能与数组一样大。

第二个怪癖是你不能将这个类与ProcessPoolExecutors或简单的Pools一起使用。如果您尝试这样做,则会收到错误消息:

>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
...     e.submit(f, args=(s,))
... 
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

警告
您可能不应该使用这种方法,因为它使用无法控制的内存量,与使用代理相比过于复杂(请参阅我的其他答案)并且可能会以惊人的方式崩溃。


4
2018-03-02 21:03





只需使用无堆栈python。您可以使用几乎任何序列化 pickle,包括功能。在这里,我序列化和反序列化 lambda 使用 pickle 模块。这类似于您在示例中尝试执行的操作。

这是Stackless Python的下载链接 http://www.stackless.com/wiki/Download

Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> f = 5
>>> g = lambda : f * f
>>> g()
25
>>> import pickle
>>> p = pickle.dumps(g)
>>> m = pickle.loads(p)
>>> m()
25
>>> 

3
2018-02-23 16:08



+1这很好但是1.确实是保留标识m是g和2.如果我序列化函数并在其他进程中反序列化它,它是否会在原始进程中调用? - 不。但如果您需要在进程关闭时保存该功能,这绝对是一个很好的解决方案。 - User