dgadiraju
3/29/2017 - 7:17 AM

spark-scala-Accumulators-and-Broadcast-variables-TopNStocksByVolumeWithName.scala

package nyse

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by itversity on 28/03/17.
  */
object TopNStocksByVolumeWithName {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Top n stocks by volume").
      setMaster(appConf.getConfig(args(3)).getString("executionMode"))
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val stockSymbolsPath = args(1)
    val outputPath = args(2)

    // We need to use HDFS FileSystem API to perform validations on input and output path
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (outputPathExists)
      fs.delete(new Path(outputPath), true)

    // coalesce is used to reduce number of tasks to process data spread across
    // many small files
    val data = sc.textFile(inputPath).
      coalesce(4)

    val stockSymbols = sc.textFile(stockSymbolsPath).
      map(rec => (rec.split("\t")(0), rec.split("\t")(1))).
      collectAsMap()

    val bv = sc.broadcast(stockSymbols)

    val totalRecords = sc.accumulator(0, "Total number of records")
    val noTradedRecords = sc.accumulator(0, "Number of records that are not traded")
    val noOfTopNRecords = sc.accumulator(0, "Number of records fall under top n records")

    data.
      // Get date in YYYYMM format and stock ticker as key and volume as value
      map(rec => {
        totalRecords += 1
        val a = rec.split(",")
        if(a(6).toInt == 0)
          noTradedRecords += 1
        ((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
      }).
      // Aggregate and get volume for each stock for each month
      reduceByKey(_ + _).
      // Move stock ticker to value, now key is trade month
      map(rec => (rec._1._1, (rec._2, rec._1._2))).
      // Group by trade month
      // Output will be (trademonth, List((stockticker, volume)))
      groupByKey().
      // Process the list to compute topN stocks by volume for each key
      // This simulate dense rank functionality
      flatMap(rec => {
      // get topN volumes
      val topNVolumes = rec._2.
        toList.
        map(_._1).
        sortBy(-_).
        distinct.
        take(args(4).toInt)

      // Check whether the volume of stock falls in topNVolumes
      rec._2.
        toList.
        sortBy(r => -r._1).
        filter(r => topNVolumes.contains(r._1)).
        map(r => (rec._1, r))
    }).
      // sort the data by trade month
      sortByKey().
      // format data to be tab delimited
      map(rec => {
        noOfTopNRecords += 1
        val s = if (bv.value.contains(rec._2._2)) bv.value.get(rec._2._2).get else rec._2._2
        rec._1 + "\t" + s + "\t" + rec._2._1
      }).
      saveAsTextFile(outputPath)
  }
}