dgadiraju
11/4/2017 - 3:09 PM

cca175-problem-02-spark.scala

var products = sc.textFile("/user/cloudera/products").
  map(x=> {
    var d = x.split('|'); (d(0).toInt,d(1).toInt,d(2).toString,d(3).toString,d(4).toFloat,d(5).toString)
  });

case class Product(
  productID:Integer, 
  productCatID: Integer, 
  productName: String, 
  productDesc:String, 
  productPrice:Float, 
  productImage:String
);
var productsDF = products.map(x=> Product(x._1,x._2,x._3,x._4,x._5,x._6)).toDF();

// Step 4 - Data Frame API
import org.apache.spark.sql.functions._
var dataFrameResult = productsDF.
  filter("productPrice < 100").
  groupBy(col("productCategory")).
  agg(max(col("productPrice")).alias("max_price"),
      countDistinct(col("productID")).alias("tot_products"),
      round(avg(col("productPrice")),2).alias("avg_price"),
      min(col("productPrice")).alias("min_price")).
  orderBy(col("productCategory"));
dataFrameResult.show();

// Step 4 - Spark SQL: 
productsDF.registerTempTable("products");
var sqlResult = sqlContext.sql("select product_category_id, max(product_price) as maximum_price, count(distinct(product_id)) as total_products, cast(avg(product_price) as decimal(10,2)) as average_price, min(product_price) as minimum_price from products where product_price <100 group by product_category_id order by product_category_id desc");
sqlResult.show();

// Step 4 - RDD aggregateByKey: 
var rddResult = productsDF.map(x=>(x(1).toString.toInt,x(4).toString.toDouble)).aggregateByKey((0.0,0.0,0,9999999999999.0))((x,y)=>(math.max(x._1,y),x._2+y,x._3+1,math.min(x._4,y)),(x,y)=>(math.max(x._1,y._1),x._2+y._2,x._3+y._3,math.min(x._4,y._4))).map(x=> (x._1,x._2._1,(x._2._2/x._2._3),x._2._3,x._2._4)).sortBy(_._1, false);
rddResult.collect().foreach(println);

// Step 5: 
import com.databricks.spark.avro._;
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
dataFrameResult.write.avro("/user/cloudera/problem2/products/result-df");
sqlResult.write.avro("/user/cloudera/problem2/products/result-sql"); 
rddResult.toDF().write.avro("/user/cloudera/problem2/products/result-rdd");;