我正在编写一个需要并行运行一系列任务的应用程序,然后运行所有任务结果的单个任务:
@celery.task
def power(value, expo):
return value ** expo
@celery.task
def amass(values):
print str(values)
这是一个非常人为和过于简单的例子,但希望这一点很好。基本上,我有 许多 需要经过的项目 power
,但我只想跑 amass
关于所有任务的结果。所有这一切都应该异步发生,我不需要任何回复 amass
方法。
有没有人知道如何在芹菜中设置它,以便一切都是异步执行的,并且在完成所有操作后调用带有结果列表的单个回调?
我已经设置了这个例子来运行一个 chord
正如Alexander Afanasiev推荐的:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
不幸的是,在上面的例子中,所有的任务都在 tasks
只有当时才开始 chord
方法被调用。有没有办法让每个任务可以单独启动,然后我可以添加一个回调组,以便在一切都完成后运行?
芹菜有 很多工具 对于您可以想象的大多数工作流程。
看来你需要利用它 弦。这是文档的引用:
和弦就像一个群体,但有一个回调。和弦由
标题组和正文,其中正文是应该执行的任务
在标题中的所有任务完成后执行。
芹菜有 很多工具 对于您可以想象的大多数工作流程。
看来你需要利用它 弦。这是文档的引用:
和弦就像一个群体,但有一个回调。和弦由
标题组和正文,其中正文是应该执行的任务
在标题中的所有任务完成后执行。
这是一个适用于我的目的的解决方案:
tasks.py:
from time import sleep
import random
@celery.task
def power(value, expo):
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
return value ** expo
@celery.task
def amass(results, tasks):
completed_tasks = []
for task in tasks:
if task.ready():
completed_tasks.append(task)
results.append(task.get())
# remove completed tasks
tasks = list(set(tasks) - set(completed_tasks))
if len(tasks) > 0:
# resend the task to execute at least 1 second from now
amass.delay(results, tasks, countdown=1)
else:
# we done
print results
使用案例:
tasks = []
for i in xrange(10):
tasks.append(power.delay(i, 2))
amass.delay([], tasks)
这是什么 应该 do是异步启动所有任务。一旦他们全部被发布到队列中,那么 amass
任务也将被发布到队列中。 amass任务将继续重新发布,直到所有其他任务完成。
从你的问题看一下这个片段,看起来你正在传递一个 list
作为和弦标题,而不是一个 group
:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
转换 list
到了 group
应该导致你期望的行为:
...
callback = amass.s()
tasks = group(tasks)
r = chord(tasks)(callback)
@ alexander-afanasiev给你的答案基本上是正确的:使用和弦。
你的代码没问题,但是 tasks.append(power.s((i, 2)))
实际上并没有执行子任务,只是添加 子任务 到列表。它的 chord(...)(...)
作为您定义的子任务向代理发送尽可能多的消息的消息 tasks
list,再加上一个回调子任务消息。你打电话时 chord
它会尽快返回。
如果您想知道和弦何时结束,您可以使用单个任务轮询完成 r.ready()
在你的样本中。