dgadiraju
7/9/2017 - 12:39 AM

scala-spark-dataframes-operations-totalrevenueperday.scala

package retail.dataframes

import com.typesafe.config.ConfigFactory

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object TotalRevenueDaily {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Total Revenue - Daily - Data Frames").
      setMaster(appConf.getConfig(args(2)).getString("executionMode"))

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")
    
    import sqlContext.implicits._

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val ordersDF = sc.textFile(inputPath + "/orders").
      map(rec => {
        val a = rec.split(",")
        Orders(a(0).toInt, a(1).toString(), a(2).toInt, a(3).toString())
      }).toDF()

    val orderItemsDF = sc.textFile(inputPath + "/order_items").
      map(rec => {
        val a = rec.split(",")
        OrderItems(
          a(0).toInt,
          a(1).toInt,
          a(2).toInt,
          a(3).toInt,
          a(4).toFloat,
          a(5).toFloat)
      }).toDF()

    val ordersFiltered = ordersDF.
      filter(ordersDF("order_status") === "COMPLETE")
    val ordersJoin = ordersFiltered.join(orderItemsDF,
      ordersFiltered("order_id") === orderItemsDF("order_item_order_id"))

    ordersJoin.
      groupBy("order_date").
      agg(sum("order_item_subtotal")).
      sort("order_date").
      rdd.
      saveAsTextFile(outputPath)
  }
}