yaodong
3/4/2017 - 6:29 AM

celery workflow test #temp

celery workflow test #temp

from celery import shared_task, chord, chain, group

@shared_task()
def workflow_test(*args, name="", time=2):
    print('name [%s] %s' % (name, args))
    sleep(time)
    return name
    
chord(
    header=group(
        chain(workflow_test.s(name='dis 1'), workflow_test.s(name='dipha 1')),
        chain(workflow_test.s(name='dis 2'), workflow_test.s(name='dipha 2')),
        chain(workflow_test.s(name='dis 3'), workflow_test.s(name='dipha 3'))
    ),
    body=chain(
        chord(
            header=workflow_test.s(name='dispatch 1'),
            body=workflow_test.s(name='dispatch 2'),
        ),
        workflow_test.s(name='find best graph')
    ),
).delay('xxx.csv', 1233)