dgadiraju
12/5/2017 - 3:39 AM

OrdersJoinOrderItems.py

from pyspark import SparkConf, SparkContext
import sys

conf = SparkConf().setAppName("Orders Join Order Items").setMaster(sys.argv[1])
sc = SparkContext(conf=conf)

inputPath = sys.argv[2]
orders = sc.textFile(inputPath + "orders")
orderItems = sc.textFile(inputPath + "order_items")

# Apply transformation to convert string to tuple
#1,2013-07-25 00:00:00.0,11599,CLOSED
ordersMap = orders. \
    map(lambda o: (int(o.split(",")[0]), o))
#(1, u'1,2013-07-25 00:00:00.0,11599,CLOSED')

# Apply transformation to convert string to tuple
#1,1,957,1,299.98,299.98
orderItemsMap = orderItems. \
    map(lambda oi: (int(oi.split(",")[1]), oi))
#(1, u'1,1,957,1,299.98,299.98')

ordersLeftOuterJoin = ordersMap.leftOuterJoin(orderItemsMap)
#(1, (u'1,2013-07-25 00:00:00.0,11599,CLOSED', u'1,1,957,1,299.98,299.98'))
#(6, (u'6,2013-07-25 00:00:00.0,7130,COMPLETE', None))

# ordersRightOuterJoin = orderItemsMap.rightOuterJoin(ordersMap)
# #(1, (u'1,1,957,1,299.98,299.98', u'1,2013-07-25 00:00:00.0,11599,CLOSED'))
# #(6, (None, u'6,2013-07-25 00:00:00.0,7130,COMPLETE'))
#
ordersWitNoOrderItems = ordersLeftOuterJoin. \
    filter(lambda rec: rec[1][0] == None). \
    map(lambda rec: rec[1][1])

outputPath = sys.argv[3]
ordersWitNoOrderItems.saveAsTextFile(outputPath)