8/2/2017 - 9:32 AM


import sys
import ConfigParser as cp
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext, Row, functions as func

    props = cp.RawConfigParser()

    conf = SparkConf().setAppName("Total Revenue Per Day").setMaster(props.get(sys.argv[5], "executionMode"))

    sc = SparkContext(conf=conf)
    inputPath = sys.argv[1]
    outputPath = sys.argv[2]
    month = sys.argv[3]

    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

    fs = FileSystem.get(Configuration())

    if(fs.exists(Path(inputPath)) == False):
        print("Input path does not exists")
            fs.delete(Path(outputPath), True)

        # Filter for orders which fall in the month passed as argument
        orders = inputPath + "/orders"
        ordersFiltered = sc.textFile(orders). \
        filter(lambda order: month in order.split(",")[1]). \
        map(lambda order: (int(order.split(",")[0]), 1))

        # Join filtered orders and order_items to get order_item details for a given month
        # Get revenue for each product_id

        orderItems = inputPath + "/order_items"
        revenueByProductId = sc.textFile(orderItems). \
            map(lambda orderItem:
                (int(orderItem.split(",")[1]), (int(orderItem.split(",")[2]), float(orderItem.split(",")[4])))
            ). \
            join(ordersFiltered). \
            map(lambda rec: rec[1][0]). \
            reduceByKey(lambda total, ele: total + ele)

        # We need to read products from local file system
        localPath = sys.argv[4]
        productsFile = open(localPath + "/products/part-00000")
        products = productsFile.read().splitlines()

        # Convert into RDD and extract product_id and product_name
        # Join it with aggregated order_items (product_id, revenue)
        # Get product_name and revenue for each product
        sc.parallelize(products). \
            map(lambda product: (int(product.split(",")[0]), product.split(",")[2])). \
        join(revenueByProductId). \
        map(lambda rec: rec[1][0] + "\t" + str(rec[1][1])). \

        print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)