Spark learning
import org.apache.spark.sql.SparkSession
// Sample Session Creation
val spark = SparkSession
.builder()
.appName("Session Creation Example")
.config("spark.some.config.option", "value")
.getOrCreate()
// Example Code for spark
// "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"
// Reading data From Json File
val df = spark.read.json("example/src/main/resources/people.json")
// Display the content of the dataframe to stdout
df.show()
// Chapter 3:
// Peparing dataSet
val rawUserArtistData = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/user_artist_data.txt")
rawUserArtistData.take(5).foreach(println)
val userArtistDF = rawUserArtistData.map { line =>
val Array(user, artist, _*) = line.split(" ")
(user.toInt, artist.toInt)
}.toDF("user", "artist")
userArtistDF.agg{"user_max": max("user"), "user_min": min("user"), "artist_min": min("artist"), "artist_max": max("artist")}
val userArtistMinMax = userArtistDF.agg(max("user").as("max_user"), min("user").as("min_user"), min("artist").as("min_artist"), max("artist").as("max_artist"))
val rawArtistData = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/artist_data.txt")
val rawArtistByID = rawArtistData.map{ line =>
val (id, name) = line.span(_ !="\t")
(id, name)
}
val rawArtistByID = rawArtistData.flatMap{ line =>
val (id, name) = line.span(_ != '\t')
if(name.isEmpty)
None
else{
try{
Some((id.toInt, name.trim))
}
catch{
case _: NumberFormatException => None
}
}
}.toDF("id", "name")
val rawArtistAlias = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap{ line =>
val Array(artist, alias) = line.split('\t')
if (artist.isEmpty){
None
}
else{
Some((artist.toInt, alias.toInt))
}
}.collect().toMap
rawArtistByID.filter($"id" isin (1208690,1003926)).show()
//=====================================================
import org.apache.spark.sql._
import org.apache.spark.broadcast._
def buildCounts(rawUserArtistData: Dataset[String], bArtistAlias: Broadcast[Map[Int, Int]]): DataFrame = {
rawUserArtistData.map{line =>
val Array(userID, artistID, count) = line.split(" ").map(_.toInt)
val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
(userID, finalArtistID, count)
}.toDF("user", "artist", "count")
}
val bArtistAlias = spark.sparkContext.broadcast(artistAlias)
val trainData = buildCounts(rawUserArtistData, bArtistAlias)
trainData.cache
import org.apache.spark.ml.recommendation._
import scala.util.Random
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).
setRegParam(0.01).
setAlpha(1.0).
setMaxIter(5).
setUserCol("user").
setItemCol("artist").
setRatingCol("count").
setPredictionCol("prediction").
fit(trainData)
// User id => Artist ID => Artist Names
val userID = 2093760
val existingArtistIDs = trainData.filter(col("user") === userID).select("artist").as[Int].collect()
rawArtistByID.filter($"id" isin (existingArtistIDs:_*)).show()