dgadiraju
3/7/2017 - 10:23 AM

scala-spark-topNProducts.scala

val l = ("Bike & Skate Shop", Iterable("933,42,Nike VR_S Covert Driver,,179.99,http://images.acmesports.sports/Nike+VR_S+Covert+Driver", 
"934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver", 
"935,42,TaylorMade RocketBallz Stage 2 Driver,,169.99,http://images.acmesports.sports/TaylorMade+RocketBallz+Stage+2+Driver", 
"936,42,Cleveland Golf Classic XL Driver,,119.99,http://images.acmesports.sports/Cleveland+Golf+Classic+XL+Driver", 
"937,42,Cobra AMP Cell Driver - Orange,,169.99,http://images.acmesports.sports/Cobra+AMP+Cell+Driver+-+Orange"))

def topNProducts(rec: (String, Iterable[String]), topN: Int): Iterable[(String, String)] = {
  rec._2.toList.sortBy(k => -k.split(",")(4).toFloat).take(topN).map(r => (rec._1, r))
}

val products = sc.textFile("/public/retail_db/products")
val productsFiltered = products.filter(rec => rec.split(",")(4) != "")
val productsMap = productsFiltered.map(rec => (rec.split(",")(1).toInt, rec))

val categories = sc.textFile("/public/retail_db/categories").
  map(rec => (rec.split(",")(0).toInt, rec.split(",")(2)))

val productsJoin = productsMap.
  join(categories).
  map(rec => (rec._2._2, rec._2._1))
val productsGroupByCategory = productsJoin.groupByKey()
productsGroupByCategory.
  flatMap(rec => topNProducts(rec, 3)).
  collect().
  foreach(println)