leongkui
11/21/2012 - 1:46 PM

Sample Celery chain usage for processing pipeline

Sample Celery chain usage for processing pipeline

import shutil
import os

from celery import task


@task()
def fetch(fixture_path):
    """
    Fetch a file from a remote location
    """
    destination = "/tmp/source.csv"
    print "Fetching data from %s - saving to %s" % (fixture_path, destination)
    shutil.copyfile(fixture_path, destination)
    return destination


@task()
def blacklist(source_path):
    base, ext = os.path.splitext(source_path)
    destination = "%s-afterblacklist%s" % (base, ext)
    print "Transforming data in %s to %s" % (source_path, destination)
    shutil.copyfile(source_path, destination)
    return destination


@task()
def transform(source_path):
    base, ext = os.path.splitext(source_path)
    destination = "%s-transformed%s" % (base, ext)
    print "Transforming data in %s to %s" % (source_path, destination)
    shutil.copyfile(source_path, destination)
    return destination


@task()
def load(filepath):
    print "Loading data in %s and removing" % filepath
    os.remove(filepath)
from celery import chain

from django.core.management.base import BaseCommand

from . import tasks


class Command(BaseCommand):

    def handle(self, *args, **kwargs):
        source_file = args[0]
        chain(
            tasks.fetch.s(source_file),         # Fetch data from remote source
            tasks.blacklist.s(),                # Remove blacklisted records
            tasks.transform.s(),                # Transform raw data ready for loading
            tasks.load.s(),                     # Load into DB
        ).apply_async()