dgadiraju
5/20/2017 - 3:31 PM

spark-scala-join-operations.scala

val path = "/public/retail_db" or val path = "/Users/itversity/Research/data/retail_db"

val orders = sc.textFile(path + "/orders").
  map(rec => (rec.split(",")(0).toInt, rec))

val orderItems = sc.textFile(path + "/order_items").
  map(rec => (rec.split(",")(1).toInt, rec))

val ordersJoin = orders.join(orderItems)
ordersJoin.take(10).foreach(println)

val ordersLeftOuter = orders.leftOuterJoin(orderItems)
ordersLeftOuter.filter(rec => rec._2._2 == None).take(10).foreach(println)
ordersLeftOuter.
  filter(rec => rec._2._2 == None).
  map(rec => rec._2._1).
  take(10).
  foreach(println)

val ordersCogroup = orders.cogroup(orderItems)
ordersCogroup.take(10).foreach(println)

val a = sc.parallelize(List(1, 2, 3, 4))
val b = sc.parallelize(List("Hello", "World"))
a.cartesian(b).foreach(println)