dgadiraju
12/6/2017 - 3:21 AM

DailyRevenuePerCustomer.py

# Problem Statement: Get daily revenue per customer

from pyspark import SparkConf, SparkContext
import sys

conf = SparkConf().setAppName("Orders Join Order Items").setMaster(sys.argv[1])
sc = SparkContext(conf=conf)

#Reading the data
inputPath = sys.argv[2]
orders = sc.textFile(inputPath + "orders")
orderItems = sc.textFile(inputPath + "order_items")

# Join orders, order_items and customers
# To join we need to convert into tuples
# First work on joining orders and order_items
ordersMap = orders.map(lambda o: (int(o.split(",")[0]), (o.split(",")[1], int(o.split(",")[2]))))

orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
ordersJoin = ordersMap.join(orderItemsMap)
# Get revenue per day per customer id
# Read customers to get customer details and broadcast
customersPath = sys.argv[3]
customers = open(customersPath).read().splitlines()
customersMap = dict(map(lambda c: (int(c.split(",")[0]),(c.split(",")[1]) + " " + (c.split(",")[2])), customers))
customersBV = sc.broadcast(customersMap)
# for i in ordersJoin.take(10): print(i)
revenuePerDatePerCustId = ordersJoin. \
    map(lambda o: ((o[1][0][0], customersBV.value[o[1][0][1]]), o[1][1])). \
    reduceByKey(lambda t, v: t + v)

revenuePerDatePerCustId. \
    map(lambda rec: rec[0][0] + "\t" + rec[0][1] + "\t" + str(rec[1])). \
    saveAsTextFile(sys.argv[4])
# Final Output: date(tab)customer name(tab)revenue
# customer name can be computed using first name and last name