问题 芹菜 - 完成任务的召唤功能


我正在使用带有django和rabbitmq的芹菜来创建一个消息队列。我也有一个工人,它来自不同的机器。在django视图中,我正在开始这样的过程:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")

我的任务配置如下:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out

我现在的问题是,代理(我的django项目)现在如何接收工作人员在其计算机上执行的“ls -l”命令的输出。我想最好的事情是,只要工作人员准备好从执行的命令发送输出,就可以在代理中调用函数。

我想异步接收worker的输出,然后用输出更新网页,但那是另一次。现在我只想收到工人的输出。

更新

现在我添加了一个HTTP GET请求,该请求在任务结束时触发,通知Web应用程序任务已完成 - 我还在http GET中发送task_id。 http GET方法调用django视图,它创建AsyncResult并获取结果,但问题是在调用时 result.get() 我收到以下错误:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"

有什么想法吗?我没有使用数据库,因为我正在使用带有AMQP的rabbitmq。

更新。

我非常想使用第三个选项,这似乎是最好的选择 - 对于小的和大的返回值。我的整个任务看起来像这样:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r

所以我重写了after_return函数,它也应该释放我的任务锁,因为任务的run()函数已经返回了一个值。在HEAD请求中,我基本上调用了一个django函数,它在task_id上调用AsyncResult,它应该提供任务的结果。在我的情况下,我使用任意结果进行测试,因为它仅用于测试。

我想知道为什么上面的代码不起作用。我可以使用on_success,但我不认为它会有所作为 - 或者它会吗?


9980
2018-03-06 00:20


起源

你能在数据库中保存命令的输出吗? - jpic
嗨,不,因为工人无法访问经纪人的数据库,也不希望他们有权访问。我肯定需要发回一个结果然后在代理中处理它。 - eleanor
也许你可以制作一个HTTP API来发回结果?在Django中有一些非常简单的方法可以做到这一点。 - jpic
是的,我做了一个HTTP GET调用,它发送回一个ID。然后Web应用程序应该只读取任务的输出,但它不起作用 - 我已经用失败的结果更新了我的问题。 - eleanor
我不明白你在做什么 - 你没有发布你的代码。但是如果一个URL将用于存储结果,那么它肯定应该存在 不 在GET上,这将违反RFC2616。考虑POST。 - jpic


答案:


如果你看 这里 你会发现以下内容:

Django-celery使用MySQL来跟踪所有任务/结果,rabbit-mq基本上用作通信总线。

真正发生的是你正试图获取 ASyncResult 当任务仍在运行时(该任务调用了对您的服务器的HTTP请求,并且由于它尚未返回),来自该工作程序的数据库锁定会话仍处于活动状态且结果行仍处于锁定状态。当Django尝试读取任务结果(其状态和运行函数的实际返回值)时,它会发现行被锁定并向您发出警告。

有几种方法可以解决这个问题:

  1. 设置另一个芹菜任务以获得结果并将其链接到您的处理任务。这样原始任务将完成,释放数据库上的锁定,新的将获取它,在django中读取结果并执行您需要它做的任何事情。在这上面查看芹菜文档。

  2. 根本没有麻烦,只需对Django进行POST,并将完整的处理结果作为有效负载附加,而不是尝试通过db获取它。

  3. 覆盖任务类中的on_success并将通知请求POST到Django,然后在db表上释放锁。

请注意,在返回run方法(可能是pickle)时,您需要存储整个处理结果(无论它有多大)。你没有提到结果有多大,所以实际上只做上面的场景#2(这就是我要做的)可能是有意义的。或者我会选择#3。另外,不要忘记在任务中处理on_failure方法。


15
2018-03-07 13:54



感谢您的评论。我已经更新了我的答案,提出了另外的问题,在接受你的回答之前我需要回答这个问题,这真的很棒。 - eleanor