我正在使用带有django和rabbitmq的芹菜来创建一个消息队列。我也有一个工人,它来自不同的机器。在django视图中,我正在开始这样的过程:
def processtask(request, name):
args = ["ls", "-l"]
MyTask.delay(args)
return HttpResponse("Task set to execute.")
我的任务配置如下:
class MyTask(Task):
def run(self, args):
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
return out
我现在的问题是,代理(我的django项目)现在如何接收工作人员在其计算机上执行的“ls -l”命令的输出。我想最好的事情是,只要工作人员准备好从执行的命令发送输出,就可以在代理中调用函数。
我想异步接收worker的输出,然后用输出更新网页,但那是另一次。现在我只想收到工人的输出。
更新
现在我添加了一个HTTP GET请求,该请求在任务结束时触发,通知Web应用程序任务已完成 - 我还在http GET中发送task_id。 http GET方法调用django视图,它创建AsyncResult并获取结果,但问题是在调用时 result.get() 我收到以下错误:
/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
"Polling results with transaction isolation level"
有什么想法吗?我没有使用数据库,因为我正在使用带有AMQP的rabbitmq。
更新。
我非常想使用第三个选项,这似乎是最好的选择 - 对于小的和大的返回值。我的整个任务看起来像这样:
class MyTask(Task):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.webhost is not None:
conn = httplib.HTTPConnection(self.webhost, self.webport)
conn.request("HEAD", "/vuln/task/output/"+task_id)
def run(self, args, webhost=None, webport=None):
self.webhost = webhost
self.webport = webport
r = "This is a basic result string used for code clarity"
return r
所以我重写了after_return函数,它也应该释放我的任务锁,因为任务的run()函数已经返回了一个值。在HEAD请求中,我基本上调用了一个django函数,它在task_id上调用AsyncResult,它应该提供任务的结果。在我的情况下,我使用任意结果进行测试,因为它仅用于测试。
我想知道为什么上面的代码不起作用。我可以使用on_success,但我不认为它会有所作为 - 或者它会吗?