Sample Celery chain usage for processing pipeline
import shutil
import os
from celery import task
@task()
def fetch(fixture_path):
"""
Fetch a file from a remote location
"""
destination = "/tmp/source.csv"
print "Fetching data from %s - saving to %s" % (fixture_path, destination)
shutil.copyfile(fixture_path, destination)
return destination
@task()
def blacklist(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-afterblacklist%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
@task()
def transform(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-transformed%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
@task()
def load(filepath):
print "Loading data in %s and removing" % filepath
os.remove(filepath)from celery import chain
from django.core.management.base import BaseCommand
from . import tasks
class Command(BaseCommand):
def handle(self, *args, **kwargs):
source_file = args[0]
chain(
tasks.fetch.s(source_file), # Fetch data from remote source
tasks.blacklist.s(), # Remove blacklisted records
tasks.transform.s(), # Transform raw data ready for loading
tasks.load.s(), # Load into DB
).apply_async()