dgadiraju
3/29/2017 - 1:15 PM

spark-scala-DailyRevenuePerDayPerDepartmentHive.scala

package retail

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

/**
  * Created by itversity on 27/03/17.
  * build.sbt
name := "doc"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
libraryDependencies += "com.typesafe" % "config" % "1.3.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"

  * spark-submit
spark-submit --class retail.DailyRevenuePerDayPerDepartmentHive \
--master yarn \
--conf spark.ui.port=25613 \
doc_2.10-1.0.jar /user/dgadiraju/DailyRevenuePerDayPerDepartmentHive prod
  */

object DailyRevenuePerDayPerDepartmentHive {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Revenue by department per day").
      setMaster(appConf.getConfig(args(1)).getString("executionMode"))
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)

    val outputPath = args(0)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val outputPathExists = fs.exists(new Path(outputPath))

    if(outputPathExists)
      fs.delete(new Path(outputPath), true)

    sqlContext.sql("use doc")
    sqlContext.setConf("spark.sql.shuffle.partitions", "2")

    sqlContext.sql("select o.order_date, d.department_name, sum(oi.order_item_subtotal) order_revenue " +
      "from departments d join categories c on d.department_id = c.category_department_id " +
      "join products p on c.category_id = p.product_category_id " +
      "join order_items oi on p.product_id = oi.order_item_product_id " +
      "join orders o on oi.order_item_order_id = o.order_id " +
      "where o.order_status in ('COMPLETE', 'CLOSED') " +
      "group by o.order_date, d.department_name " +
      "order by o.order_date, d.department_name").
      rdd.
      map(rec => rec.mkString("\t")).
      saveAsTextFile(outputPath)
  }
}