# ---
# Databricks training
# ---
"""
Hyperparameter Tuning
Hyperparameter tuning is the process of of choosing the optimal hyperparameters for a machine learning algorithm. Each algorithm has different hyperparameters to tune. You can explore these hyperparameters by using the .explainParams() method on a model.
Grid search is the process of exhaustively trying every combination of hyperparameters. It takes all of the values we want to test and combines them in every possible way so that we test them using cross-validation.
Start by performing a train/test split on the Boston dataset and building a pipeline for linear regression.
"""
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
bostonDF = (spark.read
.option("HEADER", True)
.option("inferSchema", True)
.csv("/mnt/training/bostonhousing/bostonhousing/bostonhousing.csv")
.drop("_c0")
)
trainDF, testDF = bostonDF.randomSplit([0.8, 0.2], seed=42)
assembler = VectorAssembler(inputCols=bostonDF.columns[:-1], outputCol="features")
lr = (LinearRegression()
.setLabelCol("medv")
.setFeaturesCol("features")
)
pipeline = Pipeline(stages = [assembler, lr])
# Take a look at the model parameters using the .explainParams() method.
print(lr.explainParams())
"""
1
print(lr.explainParams())
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: medv)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
regParam: regularization parameter (>= 0). (default: 0.0)
solver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (default: auto)
standardization: whether to standardize the training features before fitting the model. (default: True)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)
"""
"""
ParamGridBuilder() allows us to string together all of the different possible hyperparameters we would like to test. In this case, we can test the maximum number of iterations, whether we want to use an intercept with the y axis, and whether we want to standardize our features.
Caution Since grid search works through exhaustively building a model for each combination of parameters, it quickly becomes a lot of different unique combinations of parameters.
"""
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(lr.maxIter, [1, 10, 100])
.addGrid(lr.fitIntercept, [True, False])
.addGrid(lr.standardization, [True, False])
.build()
)
# Now paramGrid contains all of the combinations we will test in the next step. Take a look at what it contains.
paramGrid
"""
Out[11]: [{Param(parent='LinearRegression_a399d98c2750', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
Param(parent='LinearRegression_a399d98c2750', name='maxIter', doc='max number of iterations (>= 0).'): 1,
Param(parent='LinearRegression_a399d98c2750', name='fitIntercept', doc='whether to fit an intercept term.'): True},
{Param(parent='LinearRegression_a399d98c2750', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
Param(parent='LinearRegression_a399d98c2750', name='maxIter', doc='max number of iterations (>= 0).'): 1,
Param(parent='LinearRegression_a399d98c2750', name='fitIntercept', doc='whether to fit an intercept term.'): False},
{Param(parent='LinearRegression_a399d98c2750', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
Param(parent='LinearRegression_a399d98c2750', name='maxIter', doc='max number of iterations (>= 0).'): 10,
Param(parent='LinearRegression_a399d98c2750', name='fitIntercept', doc='whether to fit an intercept term.'): True},
{Param(parent='LinearRegression_a399d98c2750', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
"""
"""
Cross-Validation
There are a number of different ways of conducting cross-validation, allowing us to trade off between computational expense and model performance. An exhaustive approach to cross-validation would include every possible split of the training set. More commonly, k-fold cross-validation is used where the training dataset is divided into k smaller sets, or folds. A model is then trained on k-1 folds of the training data and the last fold is used to evaluate its performance.
Create a RegressionEvaluator() to evaluate our grid search experiments and a CrossValidator() to build our models.
"""
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
evaluator = RegressionEvaluator(
labelCol = "medv",
predictionCol = "prediction"
)
cv = CrossValidator(
estimator = pipeline, # Estimator (individual model or pipeline)
estimatorParamMaps = paramGrid, # Grid of parameters to try (grid search)
evaluator=evaluator, # Evaluator
numFolds = 3, # Set k to 3
seed = 42 # Seed to sure our results are the same if ran again
)
"""
Fit the CrossValidator()
Side Note This will train a large number of models. If your cluster size is too small, it could take a while.
"""
cvModel = cv.fit(trainDF)
# Take a look at the scores from the different experiments.
for params, score in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
print("\tScore: {}".format(score))
"""
standardization True maxIter 1 fitIntercept True
Score: 4.741212462785249
standardization True maxIter 1 fitIntercept False
Score: 4.967762731293512
standardization True maxIter 10 fitIntercept True
Score: 4.741212462785249
"""
# You can then access the best model using the .bestModel attribute.
bestModel = cvModel.bestModel
"""
Saving Models and Predictions
Spark can save both the trained model we created as well as the predictions. For online predictions such as on a stream of new data, saving the trained model and using it with Spark Streaming is a common application. It's also common to retrain an algorithm as a nightly batch process and save the results to a database or parquet table for later use.
Save the best model.
"""
modelPath = userhome + "/cvPipelineModel"
dbutils.fs.rm(modelPath, recurse=True)
cvModel.bestModel.save(modelPath)
# Take a look at where it saved.
dbutils.fs.ls(modelPath)
# Save predictions made on testDF.
predictionsPath = userhome + "/modelPredictions.parquet"
cvModel.bestModel.transform(testDF).write.mode("OVERWRITE").parquet(predictionsPath)
# exercise
#create a pipeline on breast cancer data
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
indexer = StringIndexer(inputCol="class", outputCol="is-malignant")
assembler = VectorAssembler(inputCols=cols[2:-1], outputCol="features")
logr = LogisticRegression(featuresCol="features", labelCol="is-malignant")
cancerPipeline = Pipeline(stages = [indexer, assembler, logr])
# hyperparameter grid
from pyspark.ml.tuning import ParamGridBuilder
cancerParamGrid = (ParamGridBuilder()
.addGrid(logr.regParam, [0., .2, .8, 1.])
.addGrid(logr.fitIntercept, [True, False])
.build()
)
# fit cross-validation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
binaryEvaluator = BinaryClassificationEvaluator(
labelCol = "is-malignant",
metricName = "areaUnderROC"
)
cancerCV = CrossValidator(
estimator = cancerPipeline, # Estimator (individual model or pipeline)
estimatorParamMaps = cancerParamGrid, # Grid of parameters to try (grid search)
evaluator=binaryEvaluator, # Evaluator
numFolds = 3, # Set k to 3
seed = 42 # Seed to sure our results are the same if ran again
)
cancerCVModel = cancerCV.fit(trainCancerDF)
# examine results
for params, score in zip(cancerCVModel.getEstimatorParamMaps(), cancerCVModel.avgMetrics):
print("".join([param.name+"\t"+str(params[param])+"\t" for param in params]))
print("\tScore: {}".format(score))
"""
fitIntercept True regParam 0.0
Score: 0.9914152780939816
fitIntercept True regParam 0.2
Score: 0.992053680040891
fitIntercept True regParam 0.8
Score: 0.9916311809550931
fitIntercept True regParam 1.0
Score: 0.9916311809550932
"""
"""
Review
Question: What are hyperparameters and how are they used?
Answer: A hyperparameter is a parameter, or setting, for a machine learning model that must be set before training the model. Since a hyperparameter cannot be learned from the data itself, many different hyperparameters should be tested in order to determine the set of values.
Question: How can I further improve my predictions?
Answer: There are a number of different strategies including:
Different models: train different models such as a random forest or gradient boosted trees
Expert knowledge: combine the current pipeline with domain expertise about the data
Better tuning continue tuning with more hyperparameters to choose from
Feature engineering create new features for the model to train on
Ensemble models combining predictions from multiple models can produce better results than any one model
Question: Why is cross-validation an optimal strategy for model selection?
Answer: Cross-validation is an optimal strategy for model selection because it makes the most of the available data while refraining from over-training. Cross-validation is also a flexible technique that allows for the manipulation of the number of folds (k) to balance the cost of training with the performance of the final model. If compute time is not an issue, a large k will lead to better training. Cross-validation is also embarrassingly parallel as different models can be trained and validated in parallel.
Question: Now that I've trained my model, how can I use it?
Answer: There are few different options. Commonly, predictions are calculated in batch and saved to a database where they can be served when they are needed. In the case of stream processing, models can predict on incoming streams of data. Spark can also integrate well with other model serialization formats such as MLeap. Model serving will be covered in greater detail in later courses.
"""