repo2kanagaraj
8/1/2017 - 9:49 AM

Spark learning

Spark learning

import org.apache.spark.sql.SparkSession

// Sample Session Creation

val spark = SparkSession
            .builder()
            .appName("Session Creation Example")
            .config("spark.some.config.option", "value")
            .getOrCreate()

// Example Code for spark
// "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"

// Reading data From Json File
val df = spark.read.json("example/src/main/resources/people.json")

// Display the content of the dataframe to stdout
df.show()

// Chapter 3:

// Peparing dataSet

val rawUserArtistData = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/user_artist_data.txt")

rawUserArtistData.take(5).foreach(println)

val userArtistDF = rawUserArtistData.map { line =>
  val Array(user, artist, _*) = line.split(" ")
  (user.toInt, artist.toInt)
}.toDF("user", "artist")

userArtistDF.agg{"user_max": max("user"), "user_min": min("user"), "artist_min": min("artist"), "artist_max": max("artist")}

val userArtistMinMax = userArtistDF.agg(max("user").as("max_user"), min("user").as("min_user"), min("artist").as("min_artist"), max("artist").as("max_artist"))

val rawArtistData = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/artist_data.txt")

val rawArtistByID = rawArtistData.map{ line =>
  val (id, name) = line.span(_ !="\t")
  (id, name)
}

val rawArtistByID = rawArtistData.flatMap{ line =>
  val (id, name) = line.span(_ != '\t')
  if(name.isEmpty)
    None
  else{
    try{
      Some((id.toInt, name.trim))
    }
    catch{
      case _: NumberFormatException => None
    }
  }
}.toDF("id", "name")


val rawArtistAlias = spark.read.textFile("file:///home/kanagaraj/drive/personal/exploring/spark/linkage/profiledata_06-May-2005/artist_alias.txt")

val artistAlias = rawArtistAlias.flatMap{ line =>
  val Array(artist, alias) = line.split('\t')

  if (artist.isEmpty){
    None
  }
  else{
    Some((artist.toInt, alias.toInt))
  }
}.collect().toMap


rawArtistByID.filter($"id" isin (1208690,1003926)).show()



//=====================================================

import org.apache.spark.sql._
import org.apache.spark.broadcast._

def buildCounts(rawUserArtistData: Dataset[String], bArtistAlias: Broadcast[Map[Int, Int]]): DataFrame = {
  rawUserArtistData.map{line =>
    val Array(userID, artistID, count) = line.split(" ").map(_.toInt)
    val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
    (userID, finalArtistID, count)
  }.toDF("user", "artist", "count")
}

val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

val trainData = buildCounts(rawUserArtistData, bArtistAlias)
trainData.cache

import org.apache.spark.ml.recommendation._
import scala.util.Random
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).
setRegParam(0.01).
setAlpha(1.0).
setMaxIter(5).
setUserCol("user").
setItemCol("artist").
setRatingCol("count").
setPredictionCol("prediction").
fit(trainData)

// User id => Artist ID => Artist Names

val userID = 2093760

val existingArtistIDs = trainData.filter(col("user") === userID).select("artist").as[Int].collect()

rawArtistByID.filter($"id" isin (existingArtistIDs:_*)).show()