dgadiraju
8/1/2017 - 9:28 AM

pyspark-dataframes-operations-totalrevenueperday.py

import sys
import ConfigParser as cp
try:
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext, Row, functions as func

    props = cp.RawConfigParser()
    props.read("src/main/resources/application.properties")

    conf = SparkConf().setAppName("Total Revenue Per Day").setMaster("local")
    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")
    inputPath = sys.argv[1]
    outputPath = sys.argv[2]

    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

    fs = FileSystem.get(Configuration())

    if(fs.exists(Path(inputPath)) == False):
        print("Input path does not exists")
    else:
        if(fs.exists(Path(outputPath))):
            fs.delete(Path(outputPath), True)
        ordersDF = sc.textFile(inputPath + "/orders"). \
        map(lambda rec:
          Row(order_id=int(rec.split(",")[0]), 
              order_date=rec.split(",")[1],
              order_customer_id=int(rec.split(",")[2]), 
              order_status=rec.split(",")[3])
        ).toDF()
        orderItemsDF = sc.textFile(inputPath + "/order_items"). \
            map(lambda rec:
            Row(order_item_id=int(rec.split(",")[0]), 
                order_item_order_id=int(rec.split(",")[1]),
                order_item_product_id=int(rec.split(",")[2]), 
                order_item_quantity=int(rec.split(",")[3]),
                order_item_subtotal=float(rec.split(",")[4]), 
                order_item_product_price=float(rec.split(",")[5]))
        ).toDF()

        ordersFiltered = ordersDF. \
            filter(ordersDF["order_status"] == "COMPLETE")
        ordersJoin = ordersFiltered. \
        join(orderItemsDF, 
             ordersFiltered["order_id"] == orderItemsDF["order_item_order_id"])

        ordersJoin. \
        groupBy(ordersJoin["order_date"]). \
        agg(func.sum(ordersJoin["order_item_subtotal"])). \
        sort(ordersJoin["order_date"]). \
        rdd. \
        saveAsTextFile(outputPath)

    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
sys.exit(1)