dgadiraju
5/21/2017 - 11:24 AM

spark-scala-aggregations.scala

val path = "/Users/itversity/Research/data/retail_db" or val path = "/public/retail_db"

val orderItems = sc.textFile(path + "/order_items").
  map(orderItem => (orderItem.split(",")(1).toInt, orderItem.split(",")(4).toFloat))

// Compute revenue for each order
orderItems.
  reduceByKey((total, orderItemSubtotal) => total + orderItemSubtotal).
  take(100).
  foreach(println)

// Compute revenue and number of items for each order using aggregateByKey
orderItems.
  aggregateByKey((0.0, 0))(
    (iTotal, oisubtotal) => (iTotal._1 + oisubtotal, iTotal._2 + 1),
    (fTotal, iTotal) => (fTotal._1 + iTotal._1, fTotal._2 + iTotal._2)
  ).
  take(100).
  foreach(println)

// Compute revenue and number of items for each order using reduceByKey
sc.textFile(path + "/order_items").
  map(orderItem => (orderItem.split(",")(1).toInt, (orderItem.split(",")(4).toFloat, 1))).
  reduceByKey((total, element) => (total._1 + element._1, total._2 + element._2)).
  take(100).
  foreach(println)