def load_dataframe_to_mysql(db_cur, df, schema, table, how='append', backup=True, sep='|', remove_csv=True):
"""
Args:
if_exist: append, replace
backup: True, False
"""
if how == 'append':
load_data(table)
elif how == 'replace':
temp_table = 'temp_'+table
backup_table = table+'_bak'
create_temp_table()
load_data(temp_table)
if backup:
drop_table(backup_table)
rename_table(table, backup_table)
rename_table(temp_table, table)
drop_table(table)
def load_data(to_table):
csv = '{table}.csv'.format(table=table)
df.to_csv(csv, sep=sep, header=False, index=False)
sql = """
LOAD DATA INFILE '{csv}'
INTO TABLE {schema}.{table}
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\\n'
(columns)
""".format(csv=csv, schema=schema, table=to_table, columns=df.columns)
db_cur.execute(sql)
if remove_csv:
os.remove(csv)
def create_temp_table():
sql = """
DROP TABLE {schema}.temp_{table} IF EXIST;
CREATE TABLE {schema}.temp_{table} AS {schema}.{table}
""".format(schema=schema, table=table)
db_cur.execute(sql)
def drop_table(drop_table):
sql = """
DROP TABLE {schema}.{table} IF EXIST
""".format(shcema=schema, table=drop_table)
db_cur.execute(sql)
def rename_table(old_table, new_table):
sql = """
DROP TABLE {schema}.{new_table} IF EXIST;
RENAME
"""
db_cur.rename(sql)