dgadiraju
5/29/2017 - 10:26 AM

spark-scala-groupByKey-denserank.scala

val path = "/Users/itversity/Research/data/retail_db" or val path = "/public/retail_db"

val products = sc.textFile(path + "/products")

val productsGroupByCategory = products.
  filter(product => product.split(",")(4) != "").
  map(product => {
    val p = product.split(",")
    (p(1).toInt, product)
  }).
groupByKey

//Exploring scala APIs to get top 5 priced products
val i = productsGroupByCategory.first._2
val l = i.toList

val topNPrices = l.map(rec => rec.split(",")(4).toFloat).
  sortBy(k => -k).
  distinct.
  take(5)
l.sortBy(rec => -rec.split(",")(4).toFloat).
  filter(rec => topNPrices.contains(rec.split(",")(4).toFloat)).
  foreach(println)

//Getting top 5 priced products using Spark and Scala
productsGroupByCategory.flatMap(rec => {
  val topNPrices = rec._2.toList.
    map(rec => rec.split(",")(4).toFloat).
    sortBy(k => -k).
    distinct.
    take(5)
  rec._2.toList.
    sortBy(rec => -rec.split(",")(4).toFloat).
    filter(rec => topNPrices.contains(rec.split(",")(4).toFloat))
}).
collect.
foreach(println)