package retail
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itversity on 27/03/17.
*/
object DailyRevenuePerDayPerDepartment {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Revenue by department per day").
setMaster(appConf.getConfig(args(2)).getString("executionMode"))
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
// We need to use HDFS FileSystem API to perform validations on input and output path
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if(!inputPathExists) {
println("Invalid input path")
return
}
if(outputPathExists)
fs.delete(new Path(outputPath), true)
// Joining categories and departments
// Generate (K, V) and (K, W) pair where
// K = department_id, V = department_name, W = category_id
val departments = sc.textFile(inputPath + "/departments")
val categories = sc.textFile(inputPath + "/categories")
val cdjoin = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))).
join(categories.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt)))
// After join the RDD will have elements of type
// (K, (V, W)) = (department_id, (department_name, category_id))
// Joining products with cdjoin
// (category_id, department_name) from cdjoin is joined with
// (product_category_id, product_id) from products
val products = sc.textFile(inputPath + "/products")
val cdpjoin = cdjoin.map(rec => (rec._2._2, rec._2._1)).
join(products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt)))
// Output after join will be
// (category_id, (department_name, product_id))
// Getting (order_item_product_id, (order_item_order_id, order_item_subtotal))
// from order_items
val orderItems = sc.
textFile(inputPath + "/order_items").
map(rec => (rec.split(",")(2).toInt, (rec.split(",")(1).toInt, rec.split(",")(4).toDouble)))
// Joining (order_item_product_id, (order_item_order_id, order_item_subtotal)) from order_items
// with (product_id, department_name) from cdpjoin
val cdpojoin = cdpjoin.map(rec => (rec._2._2, rec._2._1)).
join(orderItems)
// Output after join
// (product_id, (order_item_order_id, order_item_subtotal))
// Getting (order_id, order_date) for completed or closed orders
val orders = sc.textFile(inputPath + "/orders").
filter(rec => rec.split(",")(3) == "COMPLETE" || rec.split(",")(3) == "CLOSED").
map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
// Joining (order_id, order_date) with
// (order_item_order_id, (department_name, order_item_subtotal))
val cdpoojoin = cdpojoin.
map(rec => (rec._2._2._1, (rec._2._1, rec._2._2._2))).
join(orders)
// output after join
// (order_id, ((deparment_name, order_item_subtotal), order_date)
// Get data in this format with order_date and department_name as key
// ((order_date, department_name), order_item_subtotal)
// Use reduceByKey to aggregate the data using order_date and department_name as key
// Use sortByKey to sort the data by date and then department_name
// Use map to transform data to tab delimited and then save to file in text format
cdpoojoin.
map(rec => ((rec._2._2, rec._2._1._1), rec._2._1._2)).
reduceByKey(_ + _).
sortByKey().
map(rec => rec._1._1 + "\t" + rec._1._2 + "\t" + rec._2).
saveAsTextFile(outputPath)
}
}