问题 完成所有任务后运行任务


我正在编写一个需要并行运行一系列任务的应用程序,然后运行所有任务结果的单个任务:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)

这是一个非常人为和过于简单的例子,但希望这一点很好。基本上,我有 许多 需要经过的项目 power,但我只想跑 amass 关于所有任务的结果。所有这一切都应该异步发生,我不需要任何回复 amass 方法。

有没有人知道如何在芹菜中设置它,以便一切都是异步执行的,并且在完成所有操作后调用带有结果列表的单个回调?

我已经设置了这个例子来运行一个 chord 正如Alexander Afanasiev推荐的:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

不幸的是,在上面的例子中,所有的任务都在 tasks 只有当时才开始 chord 方法被调用。有没有办法让每个任务可以单独启动,然后我可以添加一个回调组,以便在一切都完成后运行?


9141
2018-04-30 20:57


起源



答案:


芹菜有 很多工具 对于您可以想象的大多数工作流程。

看来你需要利用它 。这是文档的引用:

和弦就像一个群体,但有一个回调。和弦由   标题组和正文,其中正文是应该执行的任务   在标题中的所有任务完成后执行。


4
2018-05-01 04:03



这绝对是正确的,然而,它有一个问题。我已经用细节更新了我的答案。 - Naftuli Kay


答案:


芹菜有 很多工具 对于您可以想象的大多数工作流程。

看来你需要利用它 。这是文档的引用:

和弦就像一个群体,但有一个回调。和弦由   标题组和正文,其中正文是应该执行的任务   在标题中的所有任务完成后执行。


4
2018-05-01 04:03



这绝对是正确的,然而,它有一个问题。我已经用细节更新了我的答案。 - Naftuli Kay


这是一个适用于我的目的的解决方案:

tasks.py

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results

使用案例:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)

这是什么 应该 do是异步启动所有任务。一旦他们全部被发布到队列中,那么 amass 任务也将被发布到队列中。 amass任务将继续重新发布,直到所有其他任务完成。


4
2018-05-03 00:07



嗨,看起来上面是一个很好的方法,至少在概念上。但是,当我尝试它时,与上面完全相同的代码,它会引发以下错误: EncodeError: <AsyncResult: cf5875f1-7f72-449c-9808-07c9c9459737> is not JSON serializable  非常感谢这里的一些帮助。 - qre0ct
好的,我通过直接传递final_task()一个taskIds本身列表而不是像上面的代码示例中那样传递任务对象列表来解决上述错误。无论如何,谢谢你的回答。它帮助了很多。 - qre0ct


从你的问题看一下这个片段,看起来你正在传递一个 list 作为和弦标题,而不是一个 group

from time import sleep
import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

转换 list 到了 group 应该导致你期望的行为:

...

callback = amass.s()

tasks = group(tasks)

r = chord(tasks)(callback)

2
2017-07-10 00:38



upvoted!任何想法,如果每个组开始一个新的任务,你希望和弦说等待每个组的子任务完成 - PirateApp
对不起,我已经有一段时间了,因为我详细研究了Celery,所以我不知道4.x的工作原理 - Tim McNamara


@ alexander-afanasiev给你的答案基本上是正确的:使用和弦。

你的代码没问题,但是 tasks.append(power.s((i, 2))) 实际上并没有执行子任务,只是添加 子任务  到列表。它的 chord(...)(...) 作为您定义的子任务向代理发送尽可能多的消息的消息 tasks list,再加上一个回调子任务消息。你打电话时 chord 它会尽快返回。

如果您想知道和弦何时结束,您可以使用单个任务轮询完成 r.ready() 在你的样本中。


0
2018-05-02 09:23



我希望每个子任务在发布后立即执行,而不是在发布和弦时执行。那可能吗? - Naftuli Kay
好吧,做一个 power.delay(i, 2) 在循环中并在调用之前轮询所有中间结果以完成 amass(results)。但我真的没有看到这一点。使用和弦将执行 power.s 子任务一旦在代理中作为消息可用,就会立即生成 amass 他们完成后我认为你应该澄清你想要实现的目标,因为你似乎希望异步执行任务与你提出的用法相矛盾。 - enlavin
我想出了一个解决方案,上面展示了我想要做的事情。 - Naftuli Kay
我想你刚刚重新实现了 chord 功能。 - enlavin
完全没有:我想你不明白我在问什么。和弦函数在调用之前不会开始执行任务。但是,在我的代码中,任务立即开始执行,然后完成处理程序也会发布到队列中。在我的用例中,我不会在未来的某个时刻调用实际的和弦函数,因为我已经开始了所有必要的工作任务。本质上,和弦函数是懒惰的,只有当它们全部被发布并且调用和弦函数时才启动任务。我的实施非常热切,因为它尽快启动所有任务。 - Naftuli Kay