celery workflow test #temp
from celery import shared_task, chord, chain, group
@shared_task()
def workflow_test(*args, name="", time=2):
print('name [%s] %s' % (name, args))
sleep(time)
return name
chord(
header=group(
chain(workflow_test.s(name='dis 1'), workflow_test.s(name='dipha 1')),
chain(workflow_test.s(name='dis 2'), workflow_test.s(name='dipha 2')),
chain(workflow_test.s(name='dis 3'), workflow_test.s(name='dipha 3'))
),
body=chain(
chord(
header=workflow_test.s(name='dispatch 1'),
body=workflow_test.s(name='dispatch 2'),
),
workflow_test.s(name='find best graph')
),
).delay('xxx.csv', 1233)