dgadiraju
1/28/2017 - 2:37 AM

spark-scala-word-count-externalized.scala

package wordcount

import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs._


object WordCount {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Word Count").
      setMaster(appConf.getConfig(args(2)).getString("executionMode"))

    for(c <- conf.getAll)
      println(c._2)
    val sc = new SparkContext(conf)
    val inputPath = args(0)
    val outputPath = args(1)

    // We need to use HDFS FileSystem API to perform validations on input and output path
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if(!inputPathExists) {
      println("Invalid input path")
      return
    }

    if(outputPathExists)
      fs.delete(new Path(outputPath), true)

    val wc = sc.textFile(inputPath).
      flatMap(rec => rec.split(" ")).
      map(rec => (rec, 1)).
      reduceByKey((acc, value) => acc + value)

    // changing tuple to delimited text before saving the output
    // We can also use map(rec => rec._1 + "\t" + rec._2)
    wc.
      map(rec => rec.productIterator.mkString("\t")).
      saveAsTextFile(outputPath)

  }
}