dgadiraju
11/4/2017 - 7:40 PM

spark-sql-application.py

from pyspark.sql import Row
ordersRDD = sc.textFile("/public/retail_db/orders")
ordersDF = ordersRDD.\
map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1], order_customer_id=int(o.split(",")[2]), order_status=o.split(",")[3])).toDF()
ordersDF.registerTempTable("ordersDF_table")
sqlContext.sql("select order_status, count(1) from ordersDF_table group by order_status").show()

sqlContext.sql("use dgadiraju_retail_db_txt")
from pyspark.sql import Row
productsRaw = open("/data/retail_db/products/part-00000").read().splitlines()
productsRDD = sc.parallelize(productsRaw)
productsDF = productsRDD.\
map(lambda p: Row(product_id=int(p.split(",")[0]), product_name=p.split(",")[2])).\
toDF()
productsDF.registerTempTable("products")

sqlContext.sql("select * from products").show()
sqlContext.sql("select * from orders").show()
sqlContext.sql("select * from order_items").show()

sqlContext.setConf("spark.sql.shuffle.partitions", "2")
sqlContext.sql("SELECT o.order_date, p.product_name, sum(oi.order_item_subtotal) daily_revenue_per_product \
FROM orders o JOIN order_items oi \
ON o.order_id = oi.order_item_order_id \
JOIN products p \
ON p.product_id = oi.order_item_product_id \
WHERE o.order_status IN ('COMPLETE', 'CLOSED') \
GROUP BY o.order_date, p.product_name \
ORDER BY o.order_date, daily_revenue_per_product DESC").show()