问题 Python 2.6通过Queue / Pipe / etc发送连接对象


特定 这个bug(Python Issue 4892) 这会引起以下错误:

>>> import multiprocessing
>>> multiprocessing.allow_connection_pickling()
>>> q = multiprocessing.Queue()
>>> p = multiprocessing.Pipe()
>>> q.put(p)
>>> q.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File     "/.../python2.6/multiprocessing/queues.py", line 91, in get
    res = self._recv()
TypeError: Required argument 'handle' (pos 1) not found

有没有人知道在队列上传递Connection对象的解决方法?

谢谢。


1963
2017-09-18 17:55


起源



答案:


(我相信的是)一个更好的方法,经过一些游戏(我遇到了同样的问题。想要通过管道通过管道。)在发现这篇文章之前:

>>> from multiprocessing import Pipe, reduction
>>> i, o = Pipe()
>>> reduced = reduction.reduce_connection(i)
>>> newi = reduced[0](*reduced[1])
>>> newi.send("hi")
>>> o.recv()
'hi'

我不完全确定为什么这是以这种方式构建的(有人需要深入了解多处理的减少部分是什么)但它确实有效,并且不需要pickle导入。除此之外,它与它的作用非常接近,但更简单。我还把它扔进了python bug报告,以便其他人知道解决方法。


8
2018-01-18 02:24



很好的答案。绝对看起来是一个更好的选择。 - Brian M. Hunt
这是一个很好的答案,并在2.6中为我工作。但是,在2.7,当功能 reduction.rebuild_connection AKA reduced[0] 被调用,线程无限期地阻塞。 - Sam Magura
我和@SamMagura有同样的问题。有谁知道Python 2.7的解决方法? - redrah
为了使它适用于Windows,重建减少管道使用时 from multiprocessing.reduction import rebuild_pipe_connection 而不是 reduced[0]。 (使用Win7和python 2.7.5测试)。看到 bugs.python.org/msg157736 - Peervm


答案:


(我相信的是)一个更好的方法,经过一些游戏(我遇到了同样的问题。想要通过管道通过管道。)在发现这篇文章之前:

>>> from multiprocessing import Pipe, reduction
>>> i, o = Pipe()
>>> reduced = reduction.reduce_connection(i)
>>> newi = reduced[0](*reduced[1])
>>> newi.send("hi")
>>> o.recv()
'hi'

我不完全确定为什么这是以这种方式构建的(有人需要深入了解多处理的减少部分是什么)但它确实有效,并且不需要pickle导入。除此之外,它与它的作用非常接近,但更简单。我还把它扔进了python bug报告,以便其他人知道解决方法。


8
2018-01-18 02:24



很好的答案。绝对看起来是一个更好的选择。 - Brian M. Hunt
这是一个很好的答案,并在2.6中为我工作。但是,在2.7,当功能 reduction.rebuild_connection AKA reduced[0] 被调用,线程无限期地阻塞。 - Sam Magura
我和@SamMagura有同样的问题。有谁知道Python 2.7的解决方法? - redrah
为了使它适用于Windows,重建减少管道使用时 from multiprocessing.reduction import rebuild_pipe_connection 而不是 reduced[0]。 (使用Win7和python 2.7.5测试)。看到 bugs.python.org/msg157736 - Peervm


这大致是我做的:

# Producer
from multiprocessing.reduction import reduce_connection
from multiprocessing import Pipe

   # Producer and Consumer share the Queue we call queue
def handle(queue):
   reader, writer = Pipe()
   pickled_writer = pickle.dumps(reduce_connection(writer))
   queue.put(pickled_writer)

# Consumer
from multiprocessing.reduction import rebuild_connection

def wait_for_request():
    pickled_write = queue.get(block=True) # block=True isn't necessary, of course
    upw = pickle.loads(pickled_writer) # unpickled writer
    writer = upw[0](upw[1][0],upw[1][1],upw[1][2])

最后一行是神秘的,来自以下内容:

>>> upw
(<function rebuild_connection at 0x1005df140>,
(('/var/folders/.../pymp-VhT3wX/listener-FKMB0W',
17, False), True, True))

希望能帮助别人。这对我来说可以。


7
2017-09-18 19:20



谢谢,这是一个非常有用的回复,我们真的被卡住了! - EdwardAndo