var products = sc.textFile("/user/cloudera/products").
map(x=> {
var d = x.split('|'); (d(0).toInt,d(1).toInt,d(2).toString,d(3).toString,d(4).toFloat,d(5).toString)
});
case class Product(
productID:Integer,
productCatID: Integer,
productName: String,
productDesc:String,
productPrice:Float,
productImage:String
);
var productsDF = products.map(x=> Product(x._1,x._2,x._3,x._4,x._5,x._6)).toDF();
// Step 4 - Data Frame API
import org.apache.spark.sql.functions._
var dataFrameResult = productsDF.
filter("productPrice < 100").
groupBy(col("productCategory")).
agg(max(col("productPrice")).alias("max_price"),
countDistinct(col("productID")).alias("tot_products"),
round(avg(col("productPrice")),2).alias("avg_price"),
min(col("productPrice")).alias("min_price")).
orderBy(col("productCategory"));
dataFrameResult.show();
// Step 4 - Spark SQL:
productsDF.registerTempTable("products");
var sqlResult = sqlContext.sql("select product_category_id, max(product_price) as maximum_price, count(distinct(product_id)) as total_products, cast(avg(product_price) as decimal(10,2)) as average_price, min(product_price) as minimum_price from products where product_price <100 group by product_category_id order by product_category_id desc");
sqlResult.show();
// Step 4 - RDD aggregateByKey:
var rddResult = productsDF.map(x=>(x(1).toString.toInt,x(4).toString.toDouble)).aggregateByKey((0.0,0.0,0,9999999999999.0))((x,y)=>(math.max(x._1,y),x._2+y,x._3+1,math.min(x._4,y)),(x,y)=>(math.max(x._1,y._1),x._2+y._2,x._3+y._3,math.min(x._4,y._4))).map(x=> (x._1,x._2._1,(x._2._2/x._2._3),x._2._3,x._2._4)).sortBy(_._1, false);
rddResult.collect().foreach(println);
// Step 5:
import com.databricks.spark.avro._;
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
dataFrameResult.write.avro("/user/cloudera/problem2/products/result-df");
sqlResult.write.avro("/user/cloudera/problem2/products/result-sql");
rddResult.toDF().write.avro("/user/cloudera/problem2/products/result-rdd");;