alathrop

10/14/2019 - 3:54 PM

```
''' Data Camp '''
\!h ''' Building Recommendation Engines with PySpark '''
\!h ''' implicit ratings '''
\!h ''' note: implicit ratings use the value of Zero for the rating of items not "seen"/"used" '''
\!h ''' rather than a value of NULL for explicit ratings. the .fillna(0) method can be used for this '''
''' example function to replace NULL with 0 '''
def add_zeros(df):
# Extracts distinct users
users = df.select("userId").distinct()
# Extracts distinct songs
songs = df.select("songId").distinct()
# Joins users and songs, fills blanks with 0
cross_join = users.crossJoin(items) \
.join(df, ["userId", "songId"], "left").fillna(0)
return cross_join
# Look at the data
msd.show()
# Count the number of distinct userIds
user_count = msd.select("userId").distinct().count()
print("Number of users: ", user_count)
# Count the number of distinct songIds
song_count = msd.select("songId").distinct().count()
print("Number of songs: ", song_count)
# Min num implicit ratings for a song
print("Minimum implicit ratings for a song: ")
msd.filter(col("num_plays") > 0).groupBy("songId").count().select(min("count")).show()
# Avg num implicit ratings per songs
print("Average implicit ratings per song: ")
msd.filter(col("num_plays") > 0).groupBy("songId").count().select(avg("count")).show()
# Min num implicit ratings from a user
print("Minimum implicit ratings from a user: ")
msd.filter(col("num_plays") > 0).groupBy("userId").count().select(min("count")).show()
# Avg num implicit ratings from users
print("Average implicit ratings per user: ")
msd.filter(col("num_plays") > 0).groupBy("userId").count().select(avg("count")).show()
''' another example of for adding 0's for unobserved behavior '''
''' Many recommendation engines use implicit ratings. In many cases these datasets don't include behavior counts for items that a user has never purchased. In these cases, you'll need to add them and include zeros. The dataframe Z is provided for you. It contains userId's, productId's and num_purchases which is the number of times a user has purchased a specific product. '''
# View the data
Z.show()
# Extract distinct userIds and productIds
users = Z.select("userId").distinct()
products = Z.select("productId").distinct()
# Cross join users and products
cj = users.crossJoin(products)
# Join cj and Z
Z_expanded = cj.join(Z, ["userId", "productId"], "left").fillna(0)
# View Z_expanded
Z_expanded.show()
''' Specify ALS Hyperparameters '''
# Complete the lists below
ranks = [10, 20, 30, 40]
maxIters = [10, 20, 30, 40]
regParams = [.05, .1, .15]
alphas = [20, 40, 60, 80]
\!h ''' build implicit models '''
# For loop will automatically create and store ALS models
for r in ranks:
for mi in maxIters:
for rp in regParams:
for a in alphas:
model_list.append(ALS(userCol= "userId", itemCol= "songId", ratingCol= "num_plays", rank = r, maxIter = mi, regParam = rp, alpha = a, coldStartStrategy="drop", nonnegative = True, implicitPrefs = True))
# Print the model list, and the length of model_list
print (model_list, "Length of model_list: ", len(model_list))
# Validate
len(model_list) == (len(ranks)*len(maxIters)*len(regParams)*len(alphas))
''' Running a Cross-Validated Implicit ALS Model '''
''' Now that we have several ALS models, each with a different set of hyperparameter values, we can train them on a training portion of the msd dataset using cross validation, and then run them on a test set of data and evaluate how well each one performs using the ROEM function discussed earlier. Unfortunately, this takes too much time for this exercise, so it has been done separately. But for your reference you can evaluate your model_list using the following loop (we are using the msd dataset in this case): '''
# Split the data into training and test sets
(training, test) = msd.randomSplit([0.8, 0.2])
#Building 5 folds within the training set.
train1, train2, train3, train4, train5 = training.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], seed = 1)
fold1 = train2.union(train3).union(train4).union(train5)
fold2 = train3.union(train4).union(train5).union(train1)
fold3 = train4.union(train5).union(train1).union(train2)
fold4 = train5.union(train1).union(train2).union(train3)
fold5 = train1.union(train2).union(train3).union(train4)
foldlist = [(fold1, train1), (fold2, train2), (fold3, train3), (fold4, train4), (fold5, train5)]
# Import numpy
import numpy
\!h ''' NOTE: The ROEM evaluation metric is a custom function that compares the percentiles of the
ranks of the implicit ratings with the predicted ratings from the model (they are on 2 different scales so cannot use RMSE)
code here: <https://github.com/jamenlong/ALS_expected_percent_rank_cv/blob/master/ROEM_cv.py>
A full example is explained in the DataCamp course '''
# Find the index of the smallest ROEM
i = numpy.argmin(ROEMS)
print("Index of smallest ROEM:", i)
# Find ith element of ROEMS
print("Smallest ROEM: ", ROEMS[i])
''' Extracting Parameters '''
# Extract the best_model
best_model = model_list[38]
# Extract the Rank
print ("Rank: ", best_model.getRank())
# Extract the MaxIter value
print ("MaxIter: ", best_model.getMaxIter())
# Extract the RegParam value
print ("RegParam: ", best_model.getRegParam())
# Extract the Alpha value
print ("Alpha: ", best_model.getAlpha())
# Empty list to fill with ROEMs from each model
ROEMS = []
# Loops through all models and all folds
for model in model_list:
for ft_pair in foldlist:
# Fits model to fold within training data
fitted_model = model.fit(ft_pair[0])
# Generates predictions using fitted_model on respective CV test data
predictions = fitted_model.transform(ft_pair[1])
# Generates and prints a ROEM metric CV test data
r = ROEM(predictions)
print ("ROEM: ", r)
# Fits model to all of training data and generates preds for test data
v_fitted_model = model.fit(training)
v_predictions = v_fitted_model.transform(test)
v_ROEM = ROEM(v_predictions)
# Adds validation ROEM to ROEM list
ROEMS.append(v_ROEM)
print ("Validation ROEM: ", v_ROEM)
\!h ''' Binary Ratings Models '''
''' You've already built several ALS models, so we won't do that again. An implicit ALS model has already been fitted to the binary ratings of the MovieLens dataset. Let's look at the binary_test_predictions from this model to see what we can learn.
The ROEM() function has been defined for you. Feel free to run help(ROEM) in the console if you want more details on how to execute it! '''
# Import the col function
from pyspark.sql.functions import col
# Look at the test predictions
binary_test_predictions.show()
# Evaluate ROEM on test predictions
ROEM(binary_test_predictions)
# Look at user 42's test predictions
binary_test_predictions.filter(col("userId") == 42).show()
# View user 26's original ratings
print ("User 26 Original Ratings:")
original_ratings.filter(col("userId") == 26).show()
# View user 26's recommendations
print ("User 26 Recommendations:")
binary_recs.filter(col("userId") == 26).show()
# View user 99's original ratings
print ("User 99 Original Ratings:")
original_ratings.filter(col("userId") == 99).show()
# View user 99's recommendations
print ("User 99 Recommendations:")
binary_recs.filter(col("userId") == 99).show()
```