import sys
import ConfigParser as cp
try:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row, functions as func
props = cp.RawConfigParser()
props.read("src/main/resources/application.properties")
conf = SparkConf().setAppName("Total Revenue Per Day").setMaster(props.get(sys.argv[3], "executionMode"))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
inputPath = sys.argv[1]
outputPath = sys.argv[2]
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(Configuration())
if(fs.exists(Path(inputPath)) == False):
print("Input path does not exists")
else:
if(fs.exists(Path(outputPath))):
fs.delete(Path(outputPath), True)
ordersDF = sc.textFile(inputPath + "/orders"). \
map(lambda rec:
Row(order_id=int(rec.split(",")[0]),
order_date=rec.split(",")[1],
order_customer_id=int(rec.split(",")[2]),
order_status=rec.split(",")[3])
).toDF()
ordersDF.registerTempTable("orders")
orderItemsDF = sc.textFile(inputPath + "/order_items"). \
map(lambda rec:
Row(order_item_id=int(rec.split(",")[0]),
order_item_order_id=int(rec.split(",")[1]),
order_item_product_id=int(rec.split(",")[2]),
order_item_quantity=int(rec.split(",")[3]),
order_item_subtotal=float(rec.split(",")[4]),
order_item_product_price=float(rec.split(",")[5]))
).toDF()
orderItemsDF.registerTempTable("order_items")
sql = """select o.order_date, sum(oi.order_item_subtotal) daily_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status = 'COMPLETE'
group by o.order_date
order by o.order_date"""
totalRevenueDaily = sqlContext.sql(sql)
totalRevenueDaily.rdd. \
map(lambda rec: rec["order_date"] + "\t" + str(rec["daily_revenue"])). \
saveAsTextFile(outputPath)
print ("Successfully imported Spark Modules")
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)