dgadiraju
11/4/2017 - 3:33 PM

core-spark-filtering-data-accumulators.py

#Check out our lab for practice: https://labs.itversity.com
#Filtering the Data and using accumulators
orders = sc.textFile("/public/retail_db/orders")

def isComplete(order, ordersCompletedCount, ordersNonCompletedCount):
  isCompleted = order.split(",")[3] == "COMPLETE" or order.split(",")[3] == "CLOSED"
  if(isCompleted): ordersCompletedCount = ordersCompletedCount.add(1)
  else: ordersNonCompletedCount = ordersNonCompletedCount.add(1)
  return isCompleted

ordersCompletedCount = sc.accumulator(0)
ordersNonCompletedCount = sc.accumulator(0)

ordersFiltered = orders.\
filter(lambda order: isComplete(order, ordersCompletedCount, ordersNonCompletedCount))

#We need to perform action to evaluate accumulators
ordersFiltered.count()
ordersCompletedCount.value
ordersNonCompletedCount.value

for order in ordersFiltered.take(10): print(order)
#Raise any issues on https://discuss.itversity.com - make sure to categorize properly