# Get top N customers by revenue for each day
# COMPLETE and CLOSED orders
# Use Spark SQL
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext, Row
import sys
executionMode = sys.argv[1]
topN = int(sys.argv[2])
inputBaseDir = sys.argv[3]
outputDir = sys.argv[4]
conf = SparkConf().setAppName("Top " + str(topN) + " customers per day").setMaster(executionMode)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
ordersRDD = sc.textFile(inputBaseDir + "orders")
ordersDF = ordersRDD. \
map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1],
order_customer_id = int(o.split(",")[2]), order_status=o.split(",")[3])). \
toDF()
ordersDF.registerTempTable("orders")
# sqlContext.sql("select * from orders").show()
orderItemsRDD = sc.textFile(inputBaseDir + "order_items")
orderItemsDF = orderItemsRDD. \
map(lambda oi: Row(order_item_id=int(oi.split(",")[0]),
order_item_order_id=int(oi.split(",")[1]),
order_item_product_id=int(oi.split(",")[2]),
order_item_quantity=int(oi.split(",")[3]),
order_item_subtotal=float(oi.split(",")[4]),
order_item_product_price=float(oi.split(",")[5]))). \
toDF()
orderItemsDF.registerTempTable("order_items")
customers = sc.textFile(inputBaseDir + "/customers")
customersDF = customers.\
map(lambda o: Row(customer_id = int(o.split(",")[0]),
customer_fname = o.split(",")[1],
customer_lname=o.split(",")[2],
customer_email=o.split(",")[3],
customer_password = o.split(",")[4],
customer_street=o.split(",")[5],
customer_city=o.split(",")[6],
customer_state=o.split(",")[7],
customer_zipcode=o.split(",")[8])).\
toDF()
customersDF.registerTempTable("customers")
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
sqlContext. \
sql("select order_date, order_customer_id, "
"sum(order_item_subtotal) daily_revenue_per_customer "
"from orders o join order_items oi "
"on o.order_id = oi.order_item_order_id "
"where order_status in ('COMPLETE', 'CLOSED') "
"group by order_date, order_customer_id"). \
registerTempTable("daily_revenue_per_customer")
topNCustomersPerDay = sqlContext.sql("select order_date, "
"concat(concat(customer_fname, ', '), customer_lname) customer_name, "
"daily_revenue_per_customer from "
"(select order_date, order_customer_id, "
"daily_revenue_per_customer, "
"rank() over (partition by order_date order by daily_revenue_per_customer desc) rnk "
"from daily_revenue_per_customer) q join customers c "
"on c.customer_id = q.order_customer_id "
"where rnk <= " + str(topN) + " "
"order by order_date, rnk")
topNCustomersPerDay.save(outputDir, "json")
# topNCustomersPerDay.write.json(outputDir)