dgadiraju
8/1/2017 - 2:59 PM

pyspark-dataframes-operations-totalrevenueperdaysql.py

import sys
import ConfigParser as cp
try:
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext, Row, functions as func

    props = cp.RawConfigParser()
    props.read("src/main/resources/application.properties")

    conf = SparkConf().setAppName("Total Revenue Per Day").setMaster(props.get(sys.argv[3], "executionMode"))

    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")
    inputPath = sys.argv[1]
    outputPath = sys.argv[2]

    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

    fs = FileSystem.get(Configuration())

    if(fs.exists(Path(inputPath)) == False):
        print("Input path does not exists")
    else:
        if(fs.exists(Path(outputPath))):
            fs.delete(Path(outputPath), True)
        ordersDF = sc.textFile(inputPath + "/orders"). \
        map(lambda rec:
          Row(order_id=int(rec.split(",")[0]),
              order_date=rec.split(",")[1],
              order_customer_id=int(rec.split(",")[2]),
              order_status=rec.split(",")[3])
        ).toDF()
        ordersDF.registerTempTable("orders")

        orderItemsDF = sc.textFile(inputPath + "/order_items"). \
            map(lambda rec:
            Row(order_item_id=int(rec.split(",")[0]),
                order_item_order_id=int(rec.split(",")[1]),
                order_item_product_id=int(rec.split(",")[2]),
                order_item_quantity=int(rec.split(",")[3]),
                order_item_subtotal=float(rec.split(",")[4]),
                order_item_product_price=float(rec.split(",")[5]))
        ).toDF()
        orderItemsDF.registerTempTable("order_items")

        sql = """select o.order_date, sum(oi.order_item_subtotal) daily_revenue
        from orders o join order_items oi 
        on o.order_id = oi.order_item_order_id
        where o.order_status = 'COMPLETE' 
        group by o.order_date 
        order by o.order_date"""

        totalRevenueDaily = sqlContext.sql(sql)

        totalRevenueDaily.rdd. \
            map(lambda rec: rec["order_date"] + "\t" + str(rec["daily_revenue"])). \
            saveAsTextFile(outputPath)

    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
sys.exit(1)