JSON Integration with Spark SQL and Cassandra
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
/** Spark SQL: Txt, Parquet, JSON Support with the Spark Cassandra Connector */
object SampleJson extends App {
import com.datastax.spark.connector._
import GitHubEvents._
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
.setMaster("local[*]")
.setAppName("app2")
CassandraConnector(conf).withSessionDo { session =>
session.execute("DROP KEYSPACE IF EXISTS githubstats")
session.execute("CREATE KEYSPACE githubstats WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE githubstats.monthly_commits (user VARCHAR PRIMARY KEY, commits INT, date INT)")
}
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val json = sc.parallelize(Seq("""{"user":"helena","commits":98, "month":12, "year":2014}""","""{"user":"pkolaczk", "commits":42, "month":12, "year":2014}"""))
sqlContext.jsonRDD(json).map(MonthlyCommits(_)).saveToCassandra("githubstats","monthly_commits")
sc.cassandraTable[MonthlyCommits]("githubstats","monthly_commits").collect foreach println
sc.stop()
}