# Problem Statement: Get daily revenue per customer
from pyspark import SparkConf, SparkContext
import sys
conf = SparkConf().setAppName("Orders Join Order Items").setMaster(sys.argv[1])
sc = SparkContext(conf=conf)
#Reading the data
inputPath = sys.argv[2]
orders = sc.textFile(inputPath + "orders")
orderItems = sc.textFile(inputPath + "order_items")
# Join orders, order_items and customers
# To join we need to convert into tuples
# First work on joining orders and order_items
ordersMap = orders.map(lambda o: (int(o.split(",")[0]), (o.split(",")[1], int(o.split(",")[2]))))
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
ordersJoin = ordersMap.join(orderItemsMap)
# Get revenue per day per customer id
# Read customers to get customer details and broadcast
customersPath = sys.argv[3]
customers = open(customersPath).read().splitlines()
customersMap = dict(map(lambda c: (int(c.split(",")[0]),(c.split(",")[1]) + " " + (c.split(",")[2])), customers))
customersBV = sc.broadcast(customersMap)
# for i in ordersJoin.take(10): print(i)
revenuePerDatePerCustId = ordersJoin. \
map(lambda o: ((o[1][0][0], customersBV.value[o[1][0][1]]), o[1][1])). \
reduceByKey(lambda t, v: t + v)
revenuePerDatePerCustId. \
map(lambda rec: rec[0][0] + "\t" + rec[0][1] + "\t" + str(rec[1])). \
saveAsTextFile(sys.argv[4])
# Final Output: date(tab)customer name(tab)revenue
# customer name can be computed using first name and last name