import sys
import ConfigParser as cp
try:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row, functions as func
props = cp.RawConfigParser()
props.read("src/main/resources/application.properties")
conf = SparkConf().setAppName("Total Revenue Per Day").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
inputPath = sys.argv[1]
outputPath = sys.argv[2]
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(Configuration())
if(fs.exists(Path(inputPath)) == False):
print("Input path does not exists")
else:
if(fs.exists(Path(outputPath))):
fs.delete(Path(outputPath), True)
ordersDF = sc.textFile(inputPath + "/orders"). \
map(lambda rec:
Row(order_id=int(rec.split(",")[0]),
order_date=rec.split(",")[1],
order_customer_id=int(rec.split(",")[2]),
order_status=rec.split(",")[3])
).toDF()
orderItemsDF = sc.textFile(inputPath + "/order_items"). \
map(lambda rec:
Row(order_item_id=int(rec.split(",")[0]),
order_item_order_id=int(rec.split(",")[1]),
order_item_product_id=int(rec.split(",")[2]),
order_item_quantity=int(rec.split(",")[3]),
order_item_subtotal=float(rec.split(",")[4]),
order_item_product_price=float(rec.split(",")[5]))
).toDF()
ordersFiltered = ordersDF. \
filter(ordersDF["order_status"] == "COMPLETE")
ordersJoin = ordersFiltered. \
join(orderItemsDF,
ordersFiltered["order_id"] == orderItemsDF["order_item_order_id"])
ordersJoin. \
groupBy(ordersJoin["order_date"]). \
agg(func.sum(ordersJoin["order_item_subtotal"])). \
sort(ordersJoin["order_date"]). \
rdd. \
saveAsTextFile(outputPath)
print ("Successfully imported Spark Modules")
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)