dgadiraju
3/27/2017 - 2:59 PM

spark-scala-DailyRevenuePerDayPerDepartment.scala

package retail

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by itversity on 27/03/17.
  */
object DailyRevenuePerDayPerDepartment {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Revenue by department per day").
      setMaster(appConf.getConfig(args(2)).getString("executionMode"))
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    // We need to use HDFS FileSystem API to perform validations on input and output path
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if(!inputPathExists) {
      println("Invalid input path")
      return
    }

    if(outputPathExists)
      fs.delete(new Path(outputPath), true)

    // Joining categories and departments
    // Generate (K, V) and (K, W) pair where
    // K = department_id, V = department_name, W = category_id
    val departments = sc.textFile(inputPath + "/departments")
    val categories = sc.textFile(inputPath + "/categories")
    val cdjoin = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))).
      join(categories.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt)))
    // After join the RDD will have elements of type
    // (K, (V, W)) = (department_id, (department_name, category_id))

    // Joining products with cdjoin
    // (category_id, department_name) from cdjoin is joined with
    // (product_category_id, product_id) from products
    val products = sc.textFile(inputPath + "/products")
    val cdpjoin = cdjoin.map(rec => (rec._2._2, rec._2._1)).
      join(products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt)))
    // Output after join will be
    // (category_id, (department_name, product_id))

    // Getting (order_item_product_id, (order_item_order_id, order_item_subtotal))
    // from order_items
    val orderItems = sc.
      textFile(inputPath + "/order_items").
      map(rec => (rec.split(",")(2).toInt, (rec.split(",")(1).toInt, rec.split(",")(4).toDouble)))

    // Joining (order_item_product_id, (order_item_order_id, order_item_subtotal)) from order_items
    // with (product_id, department_name) from cdpjoin
    val cdpojoin = cdpjoin.map(rec => (rec._2._2, rec._2._1)).
      join(orderItems)
    // Output after join
    // (product_id, (order_item_order_id, order_item_subtotal))

    // Getting (order_id, order_date) for completed or closed orders
    val orders = sc.textFile(inputPath + "/orders").
      filter(rec => rec.split(",")(3) == "COMPLETE" || rec.split(",")(3) == "CLOSED").
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

    // Joining (order_id, order_date) with
    // (order_item_order_id, (department_name, order_item_subtotal))
    val cdpoojoin = cdpojoin.
      map(rec => (rec._2._2._1, (rec._2._1, rec._2._2._2))).
      join(orders)
    // output after join
    // (order_id, ((deparment_name, order_item_subtotal), order_date)

    // Get data in this format with order_date and department_name as key
    // ((order_date, department_name), order_item_subtotal)
    // Use reduceByKey to aggregate the data using order_date and department_name as key
    // Use sortByKey to sort the data by date and then department_name
    // Use map to transform data to tab delimited and then save to file in text format
    cdpoojoin.
      map(rec => ((rec._2._2, rec._2._1._1), rec._2._1._2)).
      reduceByKey(_ + _).
      sortByKey().
      map(rec => rec._1._1 + "\t" + rec._1._2 + "\t" + rec._2).
      saveAsTextFile(outputPath)
  }

}