如果数据仅在子进程生成后可用,则如何为子进程提供对共享内存中数据的访问权限(使用 multiprocessing.Process)?
我知道 multiprocessing.sharedctypes.RawArray,但我无法弄清楚如何让我的子进程访问a RawArray
在进程已经开始之后创建。
数据由父进程生成,并且事先不知道数据量。
如果不是为了 GIL 我将使用线程,这将使这项任务更简单。使用非CPython实现不是一种选择。
在引擎盖下看 muliprocessing.sharedctypes,看起来分配了共享的ctype对象 运用 mmap
记忆。
所以这个问题真的归结为: 如果子进程可以访问匿名映射的内存 mmap()
在生成子进程后,父进程调用了它?
这有点像被问到的内容 这个问题,除了在我的情况下是调用者 mmap()
是父进程而不是子进程。
(解决了)
我创建了自己的版本 RawArray
用的 shm_open()
在引擎盖下。生成的共享ctypes数组可以与任何进程共享,只要标识符(tag
) 火柴。
看到 这个答案 有关详细信息和示例。
你的问题听起来非常适合 posix_ipc
要么 sysv_ipc
模块,公开POSIX或SysV API以获取共享内存,信号量和消息队列。这里的特征矩阵包括在他提供的模块中挑选的极好建议。
匿名的问题 mmap(2)
区域是你不能轻易地与其他进程共享 - 如果它们是文件支持的,它很容易,但如果你实际上不需要该文件的任何其他过程,它会感到愚蠢。您 可以 使用 CLONE_VM
国旗 clone(2)
系统调用,如果这是在C,但我不想尝试使用语言解释器,可能会对内存安全性做出假设。 (即使在C中它也有点危险,因为五年后维护程序员可能会这样做 也 感到震惊 CLONE_VM
行为。)
但SysV和更新的POSIX共享内存映射允许甚至不相关的进程通过标识符附加和分离共享内存,因此您需要做的就是从创建与使用映射的进程的映射的进程共享标识符,然后当您在映射中操作数据时,它们可同时用于所有进程,而无需任何额外的解析开销。该 shm_open(3)
函数返回一个 int
在以后的调用中用作文件描述符 ftruncate(2)
接着 mmap(2)
,因此其他进程可以使用共享内存段而不在文件系统中创建文件 - 即使所有使用它的进程都已退出,此内存仍将保留。 (或许对Unix来说有点奇怪,但它很灵活。)
免责声明:我是问题的作者。
我最终用了 posix_ipc 模块创建我自己的版本 RawArray。我主要使用 posix_ipc.SharedMemory
哪个叫 shm_open()
在引擎盖下。
我的实施(ShmemRawArray
)暴露相同的功能 RawArray
但需要两个额外的参数 - a tag
唯一地标识共享内存区域,和 create
用于确定是否应创建新的共享内存段或附加到现有内存段的标志。
如果有人有兴趣,这是一份副本: https://gist.github.com/1222327
ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)
使用说明:
- 前两个args(
typecode_or_type
和 size_or_initializer
)应该像以前一样工作 RawArray
。
- 只要有共享数组,任何进程都可以访问它
tag
火柴。
- 当原始对象(由...返回)时,共享内存段被取消链接
ShmemRawArray(..., create=True)
)被删除
- 使用a创建共享数组
tag
目前存在的将提出一个 ExistentialError
- 使用a访问共享阵列
tag
不存在的(或者已经取消链接的那个)也会引发一个 ExistentialError
一个 SSCCE (短,自包含,可编辑的例子)显示它在行动。
#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray
class Point(ctypes.Structure):
_fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]
def worker(q):
# get access to ctypes array shared by parent
count, tag = q.get()
shared_data = ShmemRawArray(Point, count, tag, False)
proc_name = multiprocessing.current_process().name
print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
if __name__ == '__main__':
procs = []
np = multiprocessing.cpu_count()
queue = multiprocessing.Queue()
# spawn child processes
for i in xrange(np):
p = multiprocessing.Process(target=worker, args=(queue,))
procs.append(p)
p.start()
# create a unique tag for shmem segment
tag = "stack-overflow-%d" % multiprocessing.current_process().pid
# random number of points with random data
count = randint(3,10)
combined_data = [Point(x=random(), y=random()) for i in xrange(count)]
# create ctypes array in shared memory using ShmemRawArray
# - we won't be able to use multiprocssing.sharectypes.RawArray here
# because children already spawned
shared_data = ShmemRawArray(Point, combined_data, tag)
# give children info needed to access ctypes array
for p in procs:
queue.put((count, tag))
print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
for p in procs:
p.join()
运行此结果会产生以下输出:
[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
我想你在找 mmap模块
关于数据的序列化 这个问题 答案当然如果你希望避免复制我没有解决方案
编辑
实际上,您可以使用CPython 3.2中的非stdlib _mutliprocessing模块来获取mmap对象的地址,并将其与ctypes对象的from_address一起使用
事实上,RawArray实际上是什么,当然你不应该尝试调整mmap对象的大小,因为在这种情况下mmap的地址可能会改变
import mmap
import _multiprocessing
from ctypes import Structure,c_int
map = mmap.mmap(-1,4)
class A(Structure):
_fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256
>>> map[0:4]
'\x00\x01\x00\x00'
要在创建子项后公开内存,您必须使用正在调用的实际文件映射内存
map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)