问题 在子进程已经开始之后访问共享内存


如果数据仅在子进程生成后可用,则如何为子进程提供对共享内存中数据的访问权限(使用 multiprocessing.Process)?

我知道 multiprocessing.sharedctypes.RawArray,但我无法弄清楚如何让我的子进程访问a RawArray 在进程已经开始之后创建。

数据由父进程生成,并且事先不知道数据量。

如果不是为了 GIL 我将使用线程,这将使这项任务更简单。使用非CPython实现不是一种选择。


在引擎盖下看 muliprocessing.sharedctypes,看起来分配了共享的ctype对象 运用 mmap记忆

所以这个问题真的归结为: 如果子进程可以访问匿名映射的内存 mmap() 在生成子进程后,父进程调用了它?

这有点像被问到的内容 这个问题,除了在我的情况下是调用者 mmap() 是父进程而不是子进程。


(解决了)

我创建了自己的版本 RawArray 用的 shm_open() 在引擎盖下。生成的共享ctypes数组可以与任何进程共享,只要标识符(tag) 火柴。

看到 这个答案 有关详细信息和示例。


10661
2017-09-14 15:44


起源

不是吗 开始 你的进程与消息容器(序列, RawArray,无论如何)作为论点?虽然最初是空的,但它将被作为参考传递(这里不确定)并且进程应该能够读取(并写入)它......或者我错了? - eudoxos
事先不知道元素的数量,所以我还不能创建容器(除非我创建一个非常大的容器以满足所有可能性)。 - Shawn Chin
如果你创造 [],你可以随意调整大小,但它仍然是同一个对象...我错过了什么? - eudoxos
这不适用于整个流程。 - Shawn Chin
为什么不从基本的IPC文件或管道/插座开始? - Maxim Egorushkin


答案:


你的问题听起来非常适合 posix_ipc 要么 sysv_ipc 模块,公开POSIX或SysV API以获取共享内存,信号量和消息队列。这里的特征矩阵包括在他提供的模块中挑选的极好建议。

匿名的问题 mmap(2) 区域是你不能轻易地与其他进程共享 - 如果它们是文件支持的,它很容易,但如果你实际上不需要该文件的任何其他过程,它会感到愚蠢。您 可以 使用 CLONE_VM 国旗 clone(2) 系统调用,如果这是在C,但我不想尝试使用语言解释器,可能会对内存安全性做出假设。 (即使在C中它也有点危险,因为五年后维护程序员可能会这样做  感到震惊 CLONE_VM 行为。)

但SysV和更新的POSIX共享内存映射允许甚至不相关的进程通过标识符附加和分离共享内存,因此您需要做的就是从创建与使用映射的进程的映射的进程共享标识符,然后当您在映射中操作数据时,它们可同时用于所有进程,而无需任何额外的解析开销。该 shm_open(3) 函数返回一个 int 在以后的调用中用作文件描述符 ftruncate(2)接着 mmap(2),因此其他进程可以使用共享内存段而不在文件系统中创建文件 - 即使所有使用它的进程都已退出,此内存仍将保留。 (或许对Unix来说有点奇怪,但它很灵活。)


5
2017-09-15 23:04



啊哈!这听起来可行。我会试一试,然后再回复你。谢谢! (我一直在寻找暴露的python模块 shm_open和朋友们,我发现的一切都是 SHM 看起来有点陈旧)。 - Shawn Chin
你的帖子让我去了 一个办法。非常感谢。 - Shawn Chin
@sarnold - 而不是clone()我强烈希望使用minherit进行mmaped区域 - Good Person


免责声明:我是问题的作者。

我最终用了 posix_ipc 模块创建我自己的版本 RawArray。我主要使用 posix_ipc.SharedMemory 哪个叫 shm_open() 在引擎盖下。

我的实施(ShmemRawArray)暴露相同的功能 RawArray 但需要两个额外的参数 - a tag 唯一地标识共享内存区域,和 create 用于确定是否应创建新的共享内存段或附加到现有内存段的标志。

如果有人有兴趣,这是一份副本: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

使用说明:

  • 前两个args(typecode_or_type 和 size_or_initializer)应该像以前一样工作 RawArray
  • 只要有共享数组,任何进程都可以访问它 tag 火柴。
  • 当原始对象(由...返回)时,共享内存段被取消链接 ShmemRawArray(..., create=True))被删除
  • 使用a创建共享数组 tag 目前存在的将提出一个 ExistentialError
  • 使用a访问共享阵列 tag 不存在的(或者已经取消链接的那个)也会引发一个 ExistentialError

一个 SSCCE (短,自包含,可编辑的例子)显示它在行动。

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

运行此结果会产生以下输出:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

6
2017-09-16 15:36





我想你在找 mmap模块

关于数据的序列化 这个问题 答案当然如果你希望避免复制我没有解决方案

编辑

实际上,您可以使用CPython 3.2中的非stdlib _mutliprocessing模块来获取mmap对象的地址,并将其与ctypes对象的from_address一起使用 事实上,RawArray实际上是什么,当然你不应该尝试调整mmap对象的大小,因为在这种情况下mmap的地址可能会改变

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

要在创建子项后公开内存,您必须使用正在调用的实际文件映射内存

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)

0
2017-09-14 15:47



除非我弄错了,否则我需要序列化我的数据,然后在子进程中加载​​它。这就是我想要避免的。 - Shawn Chin
据我所知,你试图避免广播消息的问题不是真的避免序列化 - Xavier Combelle
我关心性能和内存使用情况。必须在所有过程中反序列化相同的数据听起来并不吸引人,因此我对序列化的评论。 - Shawn Chin
感谢Xavier的更新。我担心我不明白在我的情况下如何实现我自己的mmap模块会有所帮助。 - Shawn Chin
你的更新。这几乎就是在幕后做的事情 multiprocessing.sharedctypes.RawArray,这一切都像宣传的那样有效。我正在尝试向子进程公开共享内存 后 它们已经产生了。 - Shawn Chin