dgadiraju
7/30/2017 - 3:47 PM

pyspark-groupByKey-denserank.py

path = "/Users/itversity/Research/data/retail_db" or path = "/public/retail_db"

products = sc.textFile(path + "/products")

productsGroupByCategory = products.\
filter(lambda product: product.split(",")[4] != "").\
map(lambda p:
    (int(p.split(",")[1]), p)
).\
groupByKey()

//Exploring Python APIs to get top 5 priced products
i = productsGroupByCategory.first()[1]
l = list(i)

topNPrices = list(set(sorted(map(lambda rec: float(rec.split(",")[4]), l), key=lambda k: -k)))[0:5]

l_sorted = sorted(l, key=lambda k: -float(k.split(",")[4]))
for i in filter(lambda k: float(k.split(",")[4]) in topNPrices, l_sorted):
  print(i)

def topN(l, topN):
    recs = sorted(list(l[1]), key=lambda k: -float(k.split(",")[4]))
    topNPrices = list(set(sorted(map(lambda rec: float(rec.split(",")[4]), recs), key=lambda k: -k)))[0:topN]
    topNRecs = filter(lambda k: float(k.split(",")[4]) in topNPrices, recs)
    return (x for x in topNRecs)
    

//Getting top 5 priced products using Spark and Scala
for i in productsGroupByCategory.flatMap(lambda rec: topN(rec, 5)).\
collect():
    print(i)