package retail.dataframes
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object TotalRevenueDaily {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Total Revenue - Daily - Data Frames").
      setMaster(appConf.getConfig(args(2)).getString("executionMode"))
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")
    
    import sqlContext.implicits._
    val inputPath = args(0)
    val outputPath = args(1)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }
    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }
    val ordersDF = sc.textFile(inputPath + "/orders").
      map(rec => {
        val a = rec.split(",")
        Orders(a(0).toInt, a(1).toString(), a(2).toInt, a(3).toString())
      }).toDF()
    val orderItemsDF = sc.textFile(inputPath + "/order_items").
      map(rec => {
        val a = rec.split(",")
        OrderItems(
          a(0).toInt,
          a(1).toInt,
          a(2).toInt,
          a(3).toInt,
          a(4).toFloat,
          a(5).toFloat)
      }).toDF()
    val ordersFiltered = ordersDF.
      filter(ordersDF("order_status") === "COMPLETE")
    val ordersJoin = ordersFiltered.join(orderItemsDF,
      ordersFiltered("order_id") === orderItemsDF("order_item_order_id"))
    ordersJoin.
      groupBy("order_date").
      agg(sum("order_item_subtotal")).
      sort("order_date").
      rdd.
      saveAsTextFile(outputPath)
  }
}