dgadiraju
3/28/2017 - 5:42 AM

spark-scala-TopNStocksByVolume.scala

package nyse

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by itversity on 28/03/17.
  */
object TopNStocksByVolume {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Top n stocks by volume").
      setMaster(appConf.getConfig(args(2)).getString("executionMode"))
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    // We need to use HDFS FileSystem API to perform validations on input and output path
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (outputPathExists)
      fs.delete(new Path(outputPath), true)

    // coalesce is used to reduce number of tasks to process data spread across
    // many small files
    val data = sc.textFile(inputPath).
      coalesce(4)

    data.
      // Get date in YYYYMM format and stock ticker as key and volume as value
      map(rec => {
        val a = rec.split(",")
        ((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
      }).
      // Aggregate and get volume for each stock for each month
      reduceByKey(_ + _).
      // Move stock ticker to value, now key is trade month
      map(rec => (rec._1._1, (rec._2, rec._1._2))).
      // Group by trade month
      // Output will be (trademonth, List((stockticker, volume)))
      groupByKey().
      // Process the list to compute topN stocks by volume for each key
      // This simulate dense rank functionality
      flatMap(rec => {
        // get topN volumes
        val topNVolumes = rec._2.
          toList.
          map(_._1).
          sortBy(-_).
          distinct.
          take(args(3).toInt)

        // Check whether the volume of stock falls in topNVolumes
        rec._2.
          toList.
          sortBy(r => -r._1).
          filter(r => topNVolumes.contains(r._1)).
          map(r => (rec._1, r))
      }).
      // sort the data by trade month
      sortByKey().
      // format data to be tab delimited
      map(rec => rec._1 + "\t" + rec._2._1 + "\t" + rec._2._2).
      saveAsTextFile(outputPath)
  }
}