问题 使用Celery作为Twisted应用程序的控制通道


我正在尝试使用Celery作为Twisted应用程序的控制通道。 My Twisted应用程序是一个抽象层,为各种本地运行的进程提供标准接口(通过ProcessProtocol)。我想用Celery远程控制它 - AMQP似乎是从中心位置控制许多Twisted应用程序的理想方法,我想利用Celery的基于任务的功能,例如:任务重试,子任务等

这不符合我的计划,我希望有人可以帮我指出正确的方向来实现这个目标。

我在运行脚本时尝试实现的行为是:

  • 开始略微修改的芹菜(见 下面)
  • 等待Celery任务
  • 收到“启动进程”任务后,会生成ProcessProtocol
  • 收到其他任务后,在Twisted协议上运行一个函数并使用Deferreds返回结果

'略微改良的芹菜'是 celeryd 通过一个小的修改,允许任务通过self.app.twisted访问Twisted反应器,并通过self.app.process生成进程。为了简单起见,我使用的是Celery的“独立”流程池实现,它不会为任务工作者分配新流程。

当我尝试使用Celery任务初始化ProcessProtocol(即启动外部进程)时,会出现问题。该过程正确启动,但ProcessProtocol的childDataReceived永远不会被调用。我认为这与文件描述符没有被正确继承/设置有关。

下面是一些示例代码,基于ProcessProtocol文档中的“wc”示例。它包括两个Celery任务 - 一个用于启动wc进程,另一个用于计算某些文本中的单词(使用之前启动的wc进程)。

这个例子是相当有意思的,但是如果我可以使它工作,它将作为实现我的ProcessProtocols的一个良好的起点,这是长时间运行的进程,它将响应写入stdin的命令。

我通过首先运行Celery守护进程来测试它:

python2.6 mycelery.py -l info -P solo

然后,在另一个窗口中,运行一个发送两个任务的脚本:

python2.6 command_test.py

command_test.py的预期行为是执行两个命令 - 一个启动wc进程,另一个发送一些文本到CountWordsTask。实际发生的是:

  • StartProcTask生成进程,并通过Deffered接收'process started'作为响应
  • CountWordsTask永远不会收到结果,因为从不调用childDataReceived

任何人都可以对此有所了解,或就如何最好地使用Celery作为Twisted ProcessProtocols的控制通道提供一些建议?

为Celery编写一个Twisted支持的ProcessPool实现会更好吗?我通过reactor.callLater调用WorkerCommand.execute_from_commandline的方法是否正确,以确保在Twisted线程内发生的一切?

我已经读过AMPoule,我认为它可以提供一些这样的功能,但是如果可能的话我想坚持使用Celery,因为我在我的应用程序的其他部分使用它。

任何帮助或帮助将不胜感激!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if __name__ == '__main__':
    main()

protocol.py

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = text
        self._waiting = {} # Dict to contain deferreds, keyed by command name

    def connectionMade(self):
        if 'startup' in self._waiting:
            self._waiting['startup'].callback('process started')

    def outReceived(self, data):
        fieldLength = len(data) / 3
        lines = int(data[:fieldLength])
        words = int(data[fieldLength:fieldLength*2])
        chars = int(data[fieldLength*2:])
        self.transport.loseConnection()
        self.receiveCounts(lines, words, chars)

        if 'countWords' in self._waiting:
            self._waiting['countWords'].callback(words)

    def processExited(self, status):
        print 'exiting'


    def receiveCounts(self, lines, words, chars):
        print >> sys.stderr, 'Received counts from wc.'
        print >> sys.stderr, 'Lines:', lines
        print >> sys.stderr, 'Words:', words
        print >> sys.stderr, 'Characters:', chars

    def countWords(self, text):
        self._waiting['countWords'] = Deferred()
        self.transport.write(text)
        return self._waiting['countWords']

tasks.py

from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
    def run(self):
        self.app.proc = WCProcessProtocol('testing')
        self.app.proc._waiting['startup'] = Deferred()
        self.app.twisted.spawnProcess(self.app.proc,
                                      'wc',
                                      ['wc'],
                                      usePTY=True)
        return self.app.proc._waiting['startup']

class CountWordsTask(Task):
    def run(self):
        return self.app.proc.countWords('test test')

10485
2017-11-15 13:45


起源



答案:


Celery可能在等待来自网络的新消息时阻塞。由于您在一个单螺纹过程中与Twisted反应器一起运行它,它会阻止反应堆运行。这将禁用大部分Twisted,这需要反应堆实际运行(你调用 reactor.run,但随着Celery阻止它,它实际上没有运行)。

reactor.callLater 只是推迟了芹菜的启动。一旦芹菜开始,它仍然阻塞反应堆。

您需要避免的问题是阻塞反应堆。

一种解决方案是在一个线程中运行Celery,在另一个线程中运行反应器。使用 reactor.callFromThread 从Celery线程向Twisted(“反应器线程中的调用函数”)发送消息。如果需要从Twisted线程将消息发送回Celery,请使用Celery等效项。

另一种解决方案是实施Celery协议(AMQP? - 见 txAMQP)作为本机Twisted库并使用它来处理Celery消息而不会阻塞。


11
2017-11-15 16:24



感谢您抽出宝贵时间作出回应。 - Mike Ryan
糟糕,压入太快了。感谢您抽出宝贵时间作出回应。您是否介意告诉我使用此方法从Celery的线程中使用spawnProcess的最佳方法?我是否也需要通过callFromThread调用它?我已经使用txAMQP为此构建了一个概念验证,但我发现自己试图复制Celery的大部分功能,所以我想我会尝试直接在Twisted中使用Celery。 - Mike Ryan
只是对其他任何试图在将来做类似事情的人进行快速跟进。基于上述建议,我通过启动Celery来实现这一目标 reactor.callInThread(worker.execute_from_commandline)然后打电话 blockingCallFromThread(reactor, reactor.spawnProcess, *args, **kwargs) 来自我的Celery任务代码。一世 几乎 让这个工作完全按照计划 - 一个更基于缓冲的问题来解决,我设置。再次感谢Jean-Paul和Glyph的帮助! - Mike Ryan
为了完整性,有一个库在一个线程中运行Twisted reactor pypi.python.org/pypi/crochet 如今 - nanonyme


答案:


Celery可能在等待来自网络的新消息时阻塞。由于您在一个单螺纹过程中与Twisted反应器一起运行它,它会阻止反应堆运行。这将禁用大部分Twisted,这需要反应堆实际运行(你调用 reactor.run,但随着Celery阻止它,它实际上没有运行)。

reactor.callLater 只是推迟了芹菜的启动。一旦芹菜开始,它仍然阻塞反应堆。

您需要避免的问题是阻塞反应堆。

一种解决方案是在一个线程中运行Celery,在另一个线程中运行反应器。使用 reactor.callFromThread 从Celery线程向Twisted(“反应器线程中的调用函数”)发送消息。如果需要从Twisted线程将消息发送回Celery,请使用Celery等效项。

另一种解决方案是实施Celery协议(AMQP? - 见 txAMQP)作为本机Twisted库并使用它来处理Celery消息而不会阻塞。


11
2017-11-15 16:24



感谢您抽出宝贵时间作出回应。 - Mike Ryan
糟糕,压入太快了。感谢您抽出宝贵时间作出回应。您是否介意告诉我使用此方法从Celery的线程中使用spawnProcess的最佳方法?我是否也需要通过callFromThread调用它?我已经使用txAMQP为此构建了一个概念验证,但我发现自己试图复制Celery的大部分功能,所以我想我会尝试直接在Twisted中使用Celery。 - Mike Ryan
只是对其他任何试图在将来做类似事情的人进行快速跟进。基于上述建议,我通过启动Celery来实现这一目标 reactor.callInThread(worker.execute_from_commandline)然后打电话 blockingCallFromThread(reactor, reactor.spawnProcess, *args, **kwargs) 来自我的Celery任务代码。一世 几乎 让这个工作完全按照计划 - 一个更基于缓冲的问题来解决,我设置。再次感谢Jean-Paul和Glyph的帮助! - Mike Ryan
为了完整性,有一个库在一个线程中运行Twisted reactor pypi.python.org/pypi/crochet 如今 - nanonyme