dgadiraju
11/5/2017 - 1:27 AM

Get inactive customers using left outer join between orders and customers using Data Frames and SQL

Get inactive customers using left outer join between orders and customers using Data Frames and SQL

/*
spark-shell --master yarn \
  --conf spark.ui.port=12345 \
  --num-executors 1 \
  --executor-cores 1 \
  --executor-memory 2G
*/

import scala.io.Source

val ordersRaw = Source.fromFile("/data/retail_db/orders/part-00000").getLines.toList
val ordersRDD = sc.parallelize(ordersRaw)

val customersRaw = Source.fromFile("/data/retail_db/customers/part-00000").getLines.toList
val customersRDD = sc.parallelize(customersRaw)

val ordersDF = ordersRDD.
  map(o => o.split(",")(2).toInt).
  toDF("order_customer_id")
val customersDF = customersRDD.
  map(c => (c.split(",")(0).toInt, c.split(",")(1), c.split(",")(2))).
  toDF("customer_id", "customer_fname", "customer_lname")

ordersDF.registerTempTable("orders_dg")
customersDF.registerTempTable("customers_dg")

sqlContext.setConf("spark.sql.shuffle.partitions", "1")

sqlContext.
  sql("select customer_lname, customer_fname " + 
      "from customers_dg left outer join orders_dg " +
      "on customer_id = order_customer_id " +
      "where order_customer_id is null " +
      "order by customer_lname, customer_fname").
  rdd.
  map(rec => rec.mkString(", ")).
  saveAsTextFile("/user/dgadiraju/solutions/solutions02/inactive_customers")