DAG script for downloading script.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import timedelta, datetime
from airflow.models import Variable
import xml.etree.ElementTree as ET
import glob
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 6),
'email': ['mahendra.k12@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'download_data',
default_args=default_args,
description='DAG for downloading, processing and loading data.',
schedule_interval='0 0 * * *')
PROJECT_ROOT = Variable.get("project_root")
DATA_FOLDER = Variable.get("data_directory")
def parse_book_name():
with open(DATA_FOLDER + "/output_file.tsv", 'w') as write_obj:
for filename in glob.glob(DATA_FOLDER + "/*.xml"):
root = (ET.parse(filename)).getroot()
for book in root.findall('.//book'):
book_id = book.attrib['id'] if 'id' in book.keys() else ''
book_name = book.find('title').text if book.find('title') is not None else ''
book_author = book.find('author').text if book.find('author') is not None else ''
data = "{}\t{}\t{}".format(book_id, book_name, book_author)
print(data)
write_obj.write("{}\n".format(data))
download = BashOperator(
task_id='download_task',
bash_command='{0}/scripts/download.sh {1}'.format(PROJECT_ROOT, DATA_FOLDER),
dag=dag)
process = PythonOperator(
task_id='processing_task',
depends_on_past=False,
python_callable=parse_book_name,
dag=dag)
send_report = EmailOperator(
task_id='send_report_task',
to='mahendra.k12@gmail.com',
subject='DAG {{ task_instance.dag_id }} Execution date {{ execution_date }} completed!',
html_content="""<h2>Hi Master</h2><h4>DAG {{ task_instance.dag_id }} completed<h4>
<p>Excution date: {{ execution_date }}</p>
<p>Previous_execution_date: {{ prev_execution_date }}</p>
<p>Next exectution date: {{ next_execution_date }}</p>
""",
dag=dag)
download >> process >> send_report