# -*- coding: utf-8 -*-
"""
Created on Tue Apr 16 10:54:51 2019
@author: AMaass
"""
#BLOCK 0: Required modules to execute script
import glob
import json
import pandas as pd
import boto3
import pyodbc
import os
import os.path
from os import path
import gzip
import shutil
import ntpath
from pandas.io.json import json_normalize
from datetime import datetime, timedelta
#BLOCK 1: EXTRACT - PT. 1 OF 2
#This block establishes connection to Tealium AWS Buckets for Echo
#This code establishes the connection to the correct Tealium AWS bucket.
s3 = boto3.resource('s3', aws_access_key_id = 'AKIAJ24563M7ON7SAUJQ', aws_secret_access_key = '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' )
my_bucket = s3.Bucket('dataaccess-us-east-1.tealiumiq.com')
client = boto3.client('s3', aws_access_key_id= 'AKIAJ24563M7ON7SAUJQ', aws_secret_access_key = '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')
files = client.list_objects_v2(Bucket='dataaccess-us-east-1.tealiumiq.com',Prefix="echo/main/events/all_events")
aws_path = r's3://dataaccess-us-east-1.tealiumiq.com/echo/main/'
filename = r'C:\Users\amaass\Desktop\KissmetricsDestination\TealiumEventStore_FileDump'
#It's useful to print 'files' to see which the attributes are present in the head of the files from Tealium's AWS bucket, should the need arise to modify the extract logic.
print(files)
#This code is used to establish datetime. Will be used to determine which files to pull from Tealium's AWS bucket
now = datetime.now()
now = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = now - timedelta(hours=24)
#This code downloads files from Tealium that were created the previous day. Note that they are downloaded to where the script is saved, which currently is in 'C:\Users\amaass\Documents\FilesFromTealium'
for obj in my_bucket.objects.filter(Prefix='echo/main/events/all_events/'):
km = client.head_object(Bucket = 'dataaccess-us-east-1.tealiumiq.com', Key = obj.key)
kmdt = datetime.strptime(km['ResponseMetadata']['HTTPHeaders']['last-modified'][5:-4], '%d %b %Y %H:%M:%S')
kmdt = kmdt.replace(hour=0, minute=0, second=0, microsecond=0)
print(kmdt)
if kmdt == yesterday:
aws_path, filename = os.path.split(obj.key)
my_bucket.download_file(obj.key, filename)
#BLOCK 2: EXTRACT - PT. 2 OF 2
#This block unzips the .gz files that come from Tealium's S3 bucket
source_dir = r'C:\Users\amaass\Documents\FilesFromTealium'
dest_dir = r'C:\Users\amaass\Documents\Informatica'
for src_name in glob.glob(os.path.join(source_dir, '*.gz')):
base = os.path.basename(src_name)
dest_name = os.path.join(dest_dir, base[:-3])
if not os.path.exists(dest_name):
with gzip.open(src_name, 'rb') as infile, open(dest_name, 'wb') as outfile:
try:
for line in infile:
outfile.write(line)
except EOFError:
print("End of file error occurred.")
except Exception:
print("Some error occurred.")
#BLOCK 3: TRANSFORM
#This block transforms the semi-structured JSON data into a proper dataframe, and then transforms the dataframe into a list of rows for ingestion.
#identifies all files in directory
for i in glob.glob(r'C:\Users\amaass\Documents\Informatica\*'):
with open(i, encoding='utf8') as f:
with open(r'C:\Users\amaass\Documents\Informatica\AAA_JSON_SCHEMA_DO_NOT_DELETE.txt', encoding='utf8') as j: #this appends the json schema to the beginning of the file so each file is read properly
data = j.read() + f.read()
#this gives the name to the file and its eventual destination, and checks to see if it has successfully been processed in the past
filename = ntpath.basename(i)
base_filepath = r'C:\Users\amaass\Documents\Informatica' + '\\' + filename
success_filepath = r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive' +'\\' + filename
failure_filepath = r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive' +'\\' + filename #+ '.txt'
success_dir = path.exists(success_filepath)
failure_dir = path.exists(failure_filepath)
print(filename)
print("success_dir is")
print(success_dir)
print("failure_dir is")
print(failure_dir)
if not success_dir and not failure_dir:
try:
#Creates empty dataframe that data will be dumped into
df_ = pd.DataFrame(index = [])
#structures data
data = "[" + data.replace('}', '}, ', data.count('}')-1) + "]"
#reformats data as JSON
json_data = json.loads(data)
#normalizes data to account for varying data structures and dumps into dataframe
df1 = json_normalize(json_data)
#dumps each individual dataframe into master dataframe to push to sql
df_ = df_.append(df1)
#Selects desired columns from master dataframe
df_push = pd.concat([df_['visitorid'].astype('object'),
df_['eventid'].astype('object'),
df_['pageurl_domain'].astype('object'),
df_['pageurl_path'].astype('object'),
df_['udo_tealium_timestamp_epoch'].astype('object'),
df_['udo_ut_event'].astype('object'),
df_['udo_page_type'].astype('object'),
df_['udo_page_name'].astype('object'),
df_['udo_event_name'].astype('object'),
df_['firstpartycookies_accountname'].astype('object').str.replace('"',''),
df_['firstpartycookies_accountguid'].astype('object').str.replace('"',''),
df_['firstpartycookies_userguid'].astype('object').str.replace('"',''),
df_['firstpartycookies_name'].astype('object').str.replace('"',''),
df_['firstpartycookies_utag_main_ses_id'].astype('object'),
df_['firstpartycookies_km_ni'].astype('object'),
df_['firstpartycookies_km_ai'].astype('object')],
axis=1)
#python automatically makes any NaN a float type, which cannot be iterated through. This replaces NaN with None, which is recognized as NULL by SQL
df_nan = df_push.where((pd.notnull(df_push)), None)
#converts dataframe to list of lists
reformat = df_nan.values.tolist()
#This block of code confirms connection to write to InformaticaTest database in SQL server
cnxn = pyodbc.connect(
'Driver={SQL Server Native Client 11.0};'
'Server=Lab;'
'Database=InformaticaTest;'
'Trusted_Connection=yes;')
cursor = cnxn.cursor()
#This is the write statement to insert data into the table
#Notes about 'stmt' can be found below
stmt = '''INSERT INTO InformaticaTest.web.TealiumStaging (
visitorid,
eventid,
pageurl_domain,
pageurl_path,
udo_tealium_timestamp_epoch,
udo_ut_event,
udo_page_type,
udo_page_name,
udo_event_name,
firstpartycookies_accountname,
firstpartycookies_accountguid,
firstpartycookies_userguid,
firstpartycookies_name,
firstpartycookies_utag_main_ses_id,
firstpartycookies_km_ni,
firstpartycookies_km_ai
)
VALUES (
?,
?,
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
?,
LEFT(RTRIM(?), 250),
?,
?,
?,
LEFT(RTRIM(?), 250),
LEFT(RTRIM(?), 250),
?,
?,
try_cast(? as int),
?,
?
)
'''
#STMT NOTES
#A question mark represents the value being inserted
#Number of question marks must match the number of keys above
#LEFT(RTRIM(?), 250) is used for keys that may have a value exceeding 255 characters
#Use cast(? as nvarchar(max)) in place of left(rtrim(?), 250) for fields that throw an error. referral_url is one such field that throws an error - it may have a character count exceeding 1000 characters
print('Transformed data')
#BLOCK 4: LOAD - this block pushes the data into SQL
try:
#pushes data into sql based on the above statement. Needs to be in a try statement because if it fails the script goes right to the parent exception, which means the sql connection does not close. This try/except block addresses this.
cursor.executemany(stmt, reformat)
except:
print('an error with cursor.executemany occurred. Check the datatypes of the file.')
cnxn.commit()
cnxn.close()
print('Performed cursor.executemany')
cnxn.commit()
print('Committed')
cnxn.close()
print('Closed connection')
print(filename + ' ingested successfully!')
if filename not in 'AAA_JSON_SCHEMA_DO_NOT_DELETE.txt':
shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Successful_Tealium_Files_Archive\\' + filename)
except:
#This block moves the file that threw an error to a separate directory so it can be handled later
shutil.move(i, r'C:\Users\amaass\Documents\Python ETLs\Unsuccessful_Tealium_Files_Archive\\' + filename)
print(filename)
print('had issues')
else:
os.remove(base_filepath)
#BLOCK 5: CLEAN
#This code removes the files that have been processed.
for i in glob.glob(os.path.join(source_dir, '*.gz')):
os.remove(i)
print('')
print('')
print('Great job Aaron! Time for some ping-pong, don\'t ya think?')
print('')
#BLOCK 6: REFERENCE FOR SQL CREATE TABLE
#SQL Create Staging Table Informatica.web.TealiumEvents script below
#
'''
create table InformaticaTest.web.TealiumEvents8 (
TealiumEventId int identity(1,1) not null,
VisitorId varchar(255) null,
firstpartycookies_accountname varchar(255) null,
firstpartycookies_accountguid varchar(255) null,
firstpartycookies_km_ni varchar(255) null,
firstpartycookies_km_ai varchar(255) null,
EventId varchar(255) null,
udo_tealium_timestamp_epoch varchar(255) null,
udo_ut_session_id varchar(255) null,
pageurl_domain varchar(255) null,
--dom_referrer varchar(255) null, --the values for this key sometimes exceed 255 characters. I have commented it out for now
udo_page_type varchar(255) null,
udo_page_name varchar(255) null,
udo_ut_event varchar(255) null,
udo_km_api_key varchar(255) null,
constraint PK_TealiumEvents6 primary key (TealiumEventId)
)
#
#use this to format epoch time as datetime
#DATEADD(ss, CAST (udo_tealium_timestamp_epoch AS int), '19700101') as new_date
#
#'''