userimack
10/8/2017 - 4:23 AM

DAG script for downloading script.

DAG script for downloading script.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import timedelta, datetime
from airflow.models import Variable
import xml.etree.ElementTree as ET
import glob

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 6),
    'email': ['mahendra.k12@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'download_data',
    default_args=default_args,
    description='DAG for downloading, processing and loading data.',
    schedule_interval='0 0 * * *')

PROJECT_ROOT = Variable.get("project_root")
DATA_FOLDER = Variable.get("data_directory")


def parse_book_name():
    with open(DATA_FOLDER + "/output_file.tsv", 'w') as write_obj:
        for filename in glob.glob(DATA_FOLDER + "/*.xml"):
            root = (ET.parse(filename)).getroot()
            for book in root.findall('.//book'):
                book_id = book.attrib['id'] if 'id' in book.keys() else ''
                book_name = book.find('title').text if book.find('title') is not None else ''
                book_author = book.find('author').text if book.find('author') is not None else ''
                data = "{}\t{}\t{}".format(book_id, book_name, book_author)
                print(data)
                write_obj.write("{}\n".format(data))


download = BashOperator(
    task_id='download_task',
    bash_command='{0}/scripts/download.sh {1}'.format(PROJECT_ROOT, DATA_FOLDER),
    dag=dag)

process = PythonOperator(
    task_id='processing_task',
    depends_on_past=False,
    python_callable=parse_book_name,
    dag=dag)


send_report = EmailOperator(
    task_id='send_report_task',
    to='mahendra.k12@gmail.com',
    subject='DAG {{ task_instance.dag_id }} Execution date {{ execution_date }} completed!',
    html_content="""<h2>Hi Master</h2><h4>DAG {{ task_instance.dag_id }} completed<h4>
    <p>Excution date: {{ execution_date }}</p>
    <p>Previous_execution_date: {{ prev_execution_date }}</p>
    <p>Next exectution date: {{ next_execution_date }}</p>
    """,
    dag=dag)


download >> process >> send_report