package nyse
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itversity on 28/03/17.
*/
object TopNStocksByVolumeWithName {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Top n stocks by volume").
setMaster(appConf.getConfig(args(3)).getString("executionMode"))
val sc = new SparkContext(conf)
val inputPath = args(0)
val stockSymbolsPath = args(1)
val outputPath = args(2)
// We need to use HDFS FileSystem API to perform validations on input and output path
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
val outputPathExists = fs.exists(new Path(outputPath))
if (outputPathExists)
fs.delete(new Path(outputPath), true)
// coalesce is used to reduce number of tasks to process data spread across
// many small files
val data = sc.textFile(inputPath).
coalesce(4)
val stockSymbols = sc.textFile(stockSymbolsPath).
map(rec => (rec.split("\t")(0), rec.split("\t")(1))).
collectAsMap()
val bv = sc.broadcast(stockSymbols)
val totalRecords = sc.accumulator(0, "Total number of records")
val noTradedRecords = sc.accumulator(0, "Number of records that are not traded")
val noOfTopNRecords = sc.accumulator(0, "Number of records fall under top n records")
data.
// Get date in YYYYMM format and stock ticker as key and volume as value
map(rec => {
totalRecords += 1
val a = rec.split(",")
if(a(6).toInt == 0)
noTradedRecords += 1
((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
}).
// Aggregate and get volume for each stock for each month
reduceByKey(_ + _).
// Move stock ticker to value, now key is trade month
map(rec => (rec._1._1, (rec._2, rec._1._2))).
// Group by trade month
// Output will be (trademonth, List((stockticker, volume)))
groupByKey().
// Process the list to compute topN stocks by volume for each key
// This simulate dense rank functionality
flatMap(rec => {
// get topN volumes
val topNVolumes = rec._2.
toList.
map(_._1).
sortBy(-_).
distinct.
take(args(4).toInt)
// Check whether the volume of stock falls in topNVolumes
rec._2.
toList.
sortBy(r => -r._1).
filter(r => topNVolumes.contains(r._1)).
map(r => (rec._1, r))
}).
// sort the data by trade month
sortByKey().
// format data to be tab delimited
map(rec => {
noOfTopNRecords += 1
val s = if (bv.value.contains(rec._2._2)) bv.value.get(rec._2._2).get else rec._2._2
rec._1 + "\t" + s + "\t" + rec._2._1
}).
saveAsTextFile(outputPath)
}
}