package nyse
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itversity on 28/03/17.
*/
object TopNStocksByVolume {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Top n stocks by volume").
setMaster(appConf.getConfig(args(2)).getString("executionMode"))
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
// We need to use HDFS FileSystem API to perform validations on input and output path
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (outputPathExists)
fs.delete(new Path(outputPath), true)
// coalesce is used to reduce number of tasks to process data spread across
// many small files
val data = sc.textFile(inputPath).
coalesce(4)
data.
// Get date in YYYYMM format and stock ticker as key and volume as value
map(rec => {
val a = rec.split(",")
((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
}).
// Aggregate and get volume for each stock for each month
reduceByKey(_ + _).
// Move stock ticker to value, now key is trade month
map(rec => (rec._1._1, (rec._2, rec._1._2))).
// Group by trade month
// Output will be (trademonth, List((stockticker, volume)))
groupByKey().
// Process the list to compute topN stocks by volume for each key
// This simulate dense rank functionality
flatMap(rec => {
// get topN volumes
val topNVolumes = rec._2.
toList.
map(_._1).
sortBy(-_).
distinct.
take(args(3).toInt)
// Check whether the volume of stock falls in topNVolumes
rec._2.
toList.
sortBy(r => -r._1).
filter(r => topNVolumes.contains(r._1)).
map(r => (rec._1, r))
}).
// sort the data by trade month
sortByKey().
// format data to be tab delimited
map(rec => rec._1 + "\t" + rec._2._1 + "\t" + rec._2._2).
saveAsTextFile(outputPath)
}
}