cloudcalvin
8/16/2015 - 10:43 PM

JSON Integration with Spark SQL and Cassandra

JSON Integration with Spark SQL and Cassandra

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}

/** Spark SQL: Txt, Parquet, JSON Support with the Spark Cassandra Connector */
object SampleJson extends App {
  import com.datastax.spark.connector._
  import GitHubEvents._

  val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "127.0.0.1")
    .setMaster("local[*]")
    .setAppName("app2")

  CassandraConnector(conf).withSessionDo { session =>
    session.execute("DROP KEYSPACE IF EXISTS githubstats")
    session.execute("CREATE KEYSPACE githubstats WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
    session.execute("CREATE TABLE githubstats.monthly_commits (user VARCHAR PRIMARY KEY, commits INT, date INT)")
  }

  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  val json = sc.parallelize(Seq("""{"user":"helena","commits":98, "month":12, "year":2014}""","""{"user":"pkolaczk", "commits":42, "month":12, "year":2014}"""))

  sqlContext.jsonRDD(json).map(MonthlyCommits(_)).saveToCassandra("githubstats","monthly_commits")

  sc.cassandraTable[MonthlyCommits]("githubstats","monthly_commits").collect foreach println

  sc.stop()
}