dgadiraju
3/8/2017 - 10:27 AM

scala-spark-file-formats.scala

//This is just a script not a program
//Execute these things as part of Spark Shell

//Writing as sequence file
import org.apache.hadoop.io._
val products = sc.textFile("/public/retail_db/products")
products.map(rec => (NullWritable.get(), rec)).
  saveAsSequenceFile("/user/dgadiraju/products_seq")

//Reading sequnce files
sc.sequenceFile("/user/dgadiraju/products_seq", classOf[NullWritable], classOf[Text]).
  map(rec => rec._2.toString()).
  collect().
  foreach(println)

//Writing using saveAsNewAPIHadoopFile (approach for any Hadoop new API file format)
val products = sc.textFile("/public/retail_db/products")
val productsMap = products.
  map(rec => (new IntWritable(rec.split(",")(0).toInt), new Text(rec)))

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output._

productsMap.
  saveAsNewAPIHadoopFile("/user/dgadiraju/products_seq", classOf[IntWritable], classOf[Text], classOf[SequenceFileOutputFormat[IntWritable, Text]])

//Reading using newAPIHadoopFile (approach for any Hadoop new API file format)
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input._

sc.newAPIHadoopFile("/user/dgadiraju/products_seq", classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text])

sc.newAPIHadoopFile("/user/dgadiraju/products_seq", classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text]).map(rec => rec.toString()).collect().foreach(println)