# ---
# Databricks training
# ---
peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")
from pyspark.sql.functions import avg
avgSalaryDF = peopleDF.select(avg("salary").alias("averageSalary"))
avgSalaryDF.show()
from pyspark.sql.functions import min, max
salaryDF = peopleDF.select(max("salary").alias("max"), min("salary").alias("min"), round(avg("salary")).alias("averageSalary"))
salaryDF.show()
#---
# absolute value
# ---
from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.select("firstName", "middleName", "lastName", "gender", "birthDate", "ssn", abs(col("salary")).alias("salary"))
display(peopleWithFixedSalariesDF)
""" Starting with the peopleWithFixedSalariesDF DataFrame, create another DataFrame called PeopleWithFixedSalariesSortedDF where:
The data set has been reduced to the first 20 records.
The records are sorted by the column salary in ascending order. """
peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.select("*").orderBy("salary").limit(20)
display(peopleWithFixedSalariesSortedDF)
""" As a refinement, assume all salaries under $20,000 represent bad rows and filter them out.
Additionally, categorize each person's salary into $10K groups. NOTE: .withColumn creates a new column"""
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.select("*").withColumn("salary10k", round(col("salary")/10000)).filter("salary > 20000")
display(peopleWithFixedSalaries20KDF)
""" Exercise 3 Using the peopleDF DataFrame, count the number of females named Caren who were born before March 1980. """
carensDF = (peopleDF
.filter("firstName = 'Caren'")
.filter("birthDate < '1980-03-01' ")
.filter("gender = 'F' ")
.agg(count("*").alias("total"))
)
display(carensDF)
"""
Review Questions
Q: What is the DataFrame equivalent of the SQL statement SELECT count(*) AS total
A: .agg(count("*").alias("total"))
"""
# ---
# join
# ---
peopleDistinctNamesDF = peopleDF.select("firstName").distinct()
peopleDistinctNamesDF.count()
# In preparation for the join, let's rename the firstName column to ssaFirstName in the Social Security DataFrame.
ssaDistinctNamesDF = ssaDF.select("firstName").withColumnRenamed("firstName",'ssaFirstName').distinct()
ssaDistinctNamesDF.count()
# Now join the two DataFrames.
from pyspark.sql.functions import col
joinedDF = peopleDistinctNamesDF.join(ssaDistinctNamesDF, col("firstName") == col("ssaFirstName"))
joinedDF.count()
"""
Review Questions
Q: What is the DataFrame equivalent of the SQL statement SELECT firstName FROM PeopleDistinctNames INNER JOIN SSADistinctNames ON firstName = ssaFirstName
A: peopleDistinctNamesDF.join(ssaDistinctNamesDF, peopleDistinctNamesDF(col("firstName")) == col("ssaFirstName"))
"""
\!h ''' created aggregated summary table (example from Megan from BDX)'''
from pyspark.sql.functions import *
communityAnalysis=ratings.groupby("communityId").agg(count(lit(1)).alias("numOfRatings"),\
countDistinct("shopperId").alias("numOfShoppers"),\
min("rating").alias("minRatingTotal"),\
max("rating").alias("maxRatingTotal"),\
avg("rating").alias("avgRatingTotal"))