# ---
# Databricks training
# ---
"""
MLlib can refer to both the general machine learning library in Spark or the RDD-specific API. SparkML refers to the DataFrame-specific API, which is preferred over working on RDD's wherever possible.
Spark's machine learning library, MLlib, has three main abstractions:
A transformer takes a DataFrame as an input and returns a new DataFrame with one or more columns appended to it.
Transformers implement a .transform() method.
An estimator takes a DataFrame as an input and returns a model, which itself is a transformer.
Estimators implements a .fit() method.
A pipeline combines together transformers and estimators to make it easier to combine multiple algorithms.
Pipelines implement a .fit() method.
These basic building blocks form the machine learning process in Spark from featurization through model training and deployment.
Machine learning models are only as strong as the data they see and can only work on numerical data. Featurization is the process of creating this input data for a model. There are a number of common featurization approaches:
Encoding categorical variables
Normalizing
Creating new features
Handling missing values
Binning/discretizing
"""
"""
Categorical features refer to a discrete number of groups. In the case of the AirBnB dataset we'll use in this lesson, one categorical variable is room type. There are three types of rooms: Private room, Entire home/apt, and Shared room.
A machine learning model does not know how to handle these room types. Instead, we must first encode each unique string into a number. Second, we must one-hot encode each of those values to a location in an array. This allows our machine learning algorithms to model effects of each category.
Room type Room type index One-hot encoded room type index
Private room 0 [1, 0 ]
Entire home/apt 1 [0, 1]
Shared room 2 [0, 0]
"""
"""
Take the unique values of room_type and index them to a numerical value. Fit the StringIndexer estimator to the unique room types using the .fit() method and by passing in the data.
The trained StringIndexer model then becomes a transformer. Use it to transform the results using the .transform() method and by passing in the data.
"""
from pyspark.ml.feature import StringIndexer
uniqueTypesDF = airbnbDF.select("room_type").distinct() # Use distinct values to demonstrate how StringIndexer works
indexer = StringIndexer(inputCol="room_type", outputCol="room_type_index") # Set input column and new output column
indexerModel = indexer.fit(uniqueTypesDF) # Fit the indexer to learn room type/index pairs
indexedDF = indexerModel.transform(uniqueTypesDF) # Append a new column with the index
display(indexedDF)
""" output
indexedDF:pyspark.sql.dataframe.DataFrame = [room_type: string, room_type_index: double]
Shared room 2
Entire home/apt 1
Private room 0
"""
"""
Now each room has a unique numerical value assigned. While we could pass the new room_type_index into a machine learning model, it would assume that Shared room is twice as much as Entire home/apt, which is not the case. Instead, we need to change these values to a binary yes/no value if a listing is for a shared room, entire home, or private room.
Do this by training and fitting the OneHotEncoderEstimator, which only operates on numerical values (this is why we needed to use StringIndexer first).
Side Note Certain models, such as random forest, do not need one-hot encoding (and can actually be negatively affected by the process). The models we'll explore in this course, however, do need this process.
"""
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["room_type_index"], outputCols=["encoded_room_type"])
encoderModel = encoder.fit(indexedDF)
encodedDF = encoderModel.transform(indexedDF)
display(encodedDF)
"""
encodedDF:pyspark.sql.dataframe.DataFrame = [room_type: string, room_type_index: double ... 1 more fields]
Shared room 2 [0,2,[],[]]
Entire home/apt 1 [0,2,[1],[1]]
Private room 0 [0,2,[0],[1]]
The new column encoded_room_type is a vector. The difference between a sparse and dense vector is whether Spark records all of the empty values. In a sparse vector, like we see here, Spark saves space by only recording the places where the vector has a non-zero value. The value of 0 in the first position indicates that it's a sparse vector. The second value indicates the length of the vector.
Here's how to read the mapping above:
Shared room maps to the vector [0, 0]
Entire home/apt maps to the vector [0, 1]
Private room maps to the vector [1, 0]
"""
"""
Imputing Null or Missing Data
Null values refer to unknown or missing data as well as irrelevant responses. Strategies for dealing with this scenario include:
Dropping these records: Works when you do not need to use the information for downstream workloads
Adding a placeholder (e.g. -1): Allows you to see missing data later on without violating a schema
Basic imputing: Allows you to have a "best guess" of what the data could have been, often by using the mean of non-missing data
Advanced imputing: Determines the "best guess" of what data should be using more advanced strategies such as clustering machine learning algorithms or oversampling techniques such as SMOTE.
"""
# Try dropping missing values. drop null
countWithoutDropping = airbnbDF.count()
countWithDropping = airbnbDF.na.drop(subset=["zipcode", "host_is_superhost"]).count()
print("Count without dropping nulls:\t", countWithoutDropping)
print("Count with dropping nulls:\t", countWithDropping)
"""
Count without dropping nulls: 4804
Count with dropping nulls: 4746
"""
# Another common option for working with missing data is to impute the missing values with a best guess for their value. Try imputing a list of columns with their median.
from pyspark.ml.feature import Imputer
imputeCols = [
"host_total_listings_count",
"bathrooms",
"beds",
"review_scores_rating",
"review_scores_accuracy",
"review_scores_cleanliness",
"review_scores_checkin",
"review_scores_communication",
"review_scores_location",
"review_scores_value"
]
imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
imputerModel = imputer.fit(airbnbDF)
imputedDF = imputerModel.transform(airbnbDF)
display(imputedDF)
# another example of replacing null (with a value of 0 in this case)
# <https://stackoverflow.com/questions/42312042/how-to-replace-all-null-values-of-a-dataframe-in-pyspark>
gaData = gaData.fillna({'totals_bounces':'0'})
"""
Creating a Pipeline
Passing around estimator objects, trained estimators, and transformed dataframes quickly becomes cumbersome. Spark uses the convention established by scikit-learn to combine each of these steps into a single pipeline. We can now combine all of these steps into a single pipeline.
The pipeline is itself is now an estimator. Train the model with its .fit() method and then transform the original dataset. We've now combined all of our featurization steps into one pipeline with three stages
"""
pipelineModel = pipeline.fit(airbnbDF)
transformedDF = pipelineModel.transform(airbnbDF)
display(transformedDF)
# exercise
"""
Step 1: Binning review_scores_rating
Divide the hosts by whether their review_scores_rating is above 97. Do this using the transformer Binarizer with the output column high_rating. This should create the objects binarizer and the transformed DataFrame transformedBinnedDF.
Hint: Note that Binarizer is a transformer, so it does not have a .fit() method
"""
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=97, inputCol="review_scores_rating", outputCol="high_rating")
transformedBinnedDF = binarizer.transform(transformedDF)
display(transformedBinnedDF)
"""
Step 2: Regular Expressions on Strings
Clean the column price by creating two new columns:
price: a new column that contains a cleaned version of price. This can be done using the regular expression replacement of "[\$,]" with "". Cast the column as a decimal.
raw_price: the collumn price in its current form
"""
from pyspark.sql.functions import col, regexp_replace
transformedBinnedRegexDF = (transformedBinnedDF
.withColumnRenamed("price", "price_raw")
.withColumn("price", regexp_replace(col("price_raw"), "[\$,]", "").cast("Decimal(10,2)"))
)
display(transformedBinnedRegexDF)
"""
Step 3: Filter Extremes
The dataset contains extreme values, including negative prices and minimum stays of over one year. Filter out all prices of $0 or less and all minimum_nights of 365 or higher. Save the results to transformedBinnedRegexFilteredDF.
"""
from pyspark.sql.functions import col
transformedBinnedRegexFilteredDF = (transformedBinnedRegexDF
.filter(col("price") > 0)
.filter(col("minimum_nights") <= 365)
)
display(transformedBinnedRegexFilteredDF)
"""
Review
Question: What's the difference between a transformer, estimator, and pipeline?
Answer: The Spark machine learning API and feature library is based on these main abstractions:
Transformers transform your data by appending a new column to a DataFrame.
Estimators learn something about your data and implement the .fit() method. A trained estimator then becomes a transformer
Pipelines link together transformers and estimators into a single object for convenience.
Question: How do you handle categorical features?
Answer: Categorical features are a robust subject, so much so that there is a field dedicated to their study: discrete mathematics. The most common way of handling categorical features is to one-hot encode them where each unique value is translated to a position in an array. There are a host of other techniques as well. For instance, high cordiality features are categorical features with many unique values. In this case, one-hot encoding that many features would create too many dimensions. One alternative is to bin the values to reduce the number of features but still contribute some information to the machine learning model.
Question: What's the best way to handle null values?
Answer: The answer depends largely on what you hope to do with your data moving forward. You can drop null values or impute them with a number of different techniques. For instance, clustering your data to fill null values with the values of nearby neighbors often gives more insight to machine learning models than using a simple mean.
"""