alathrop
8/14/2019 - 2:25 PM

spark aggregations and joins

# ---
# Databricks training
# ---

peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")

from pyspark.sql.functions import avg
avgSalaryDF = peopleDF.select(avg("salary").alias("averageSalary"))

avgSalaryDF.show()

from pyspark.sql.functions import min, max
salaryDF = peopleDF.select(max("salary").alias("max"), min("salary").alias("min"), round(avg("salary")).alias("averageSalary"))

salaryDF.show()

#---
# absolute value
# ---
from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.select("firstName", "middleName", "lastName", "gender", "birthDate", "ssn", abs(col("salary")).alias("salary"))
display(peopleWithFixedSalariesDF)

""" Starting with the peopleWithFixedSalariesDF DataFrame, create another DataFrame called PeopleWithFixedSalariesSortedDF where:

    The data set has been reduced to the first 20 records.
    The records are sorted by the column salary in ascending order. """
    
peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.select("*").orderBy("salary").limit(20)
display(peopleWithFixedSalariesSortedDF)

""" As a refinement, assume all salaries under $20,000 represent bad rows and filter them out.
Additionally, categorize each person's salary into $10K groups. NOTE: .withColumn creates a new column"""
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.select("*").withColumn("salary10k", round(col("salary")/10000)).filter("salary > 20000")
display(peopleWithFixedSalaries20KDF)

""" Exercise 3 Using the peopleDF DataFrame, count the number of females named Caren who were born before March 1980. """
carensDF = (peopleDF
  .filter("firstName = 'Caren'")
  .filter("birthDate < '1980-03-01' ")
  .filter("gender = 'F' ")
  .agg(count("*").alias("total"))
   )
display(carensDF)

"""
Review Questions
Q: What is the DataFrame equivalent of the SQL statement SELECT count(*) AS total
A: .agg(count("*").alias("total"))
"""
# ---
# join
# ---

peopleDistinctNamesDF = peopleDF.select("firstName").distinct()

peopleDistinctNamesDF.count()

# In preparation for the join, let's rename the firstName column to ssaFirstName in the Social Security DataFrame.
ssaDistinctNamesDF = ssaDF.select("firstName").withColumnRenamed("firstName",'ssaFirstName').distinct()

ssaDistinctNamesDF.count()

# Now join the two DataFrames.
from pyspark.sql.functions import col
joinedDF = peopleDistinctNamesDF.join(ssaDistinctNamesDF, col("firstName") == col("ssaFirstName"))

joinedDF.count()

"""
Review Questions
Q: What is the DataFrame equivalent of the SQL statement SELECT firstName FROM PeopleDistinctNames INNER JOIN SSADistinctNames ON firstName = ssaFirstName
A: peopleDistinctNamesDF.join(ssaDistinctNamesDF, peopleDistinctNamesDF(col("firstName")) == col("ssaFirstName"))
"""

\!h ''' created aggregated summary table (example from Megan from BDX)'''
from pyspark.sql.functions import *
communityAnalysis=ratings.groupby("communityId").agg(count(lit(1)).alias("numOfRatings"),\
                                                     countDistinct("shopperId").alias("numOfShoppers"),\
                                                     min("rating").alias("minRatingTotal"),\
                                                     max("rating").alias("maxRatingTotal"),\
                                                     avg("rating").alias("avgRatingTotal"))