dgadiraju
11/4/2017 - 3:06 PM

cca175-problem-04-file-formats.scala

// Step 1: 
sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --password cloudera \
  --username retail_dba \
  --table orders \
  --as-textfile \
  --fields-terminated-by '\t' \
  --target-dir /user/cloudera/problem5/text -m 1

// Step 2: 
sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --password cloudera \
  --username retail_dba \
  --table orders \
  --as-avrodatafile \
  --target-dir /user/cloudera/problem5/avro \
  -m 1

// Step 3: 
sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --password cloudera \
  --username retail_dba \
  --table orders \
  --as-parquetfile \
  --target-dir /user/cloudera/problem5/parquet \
  -m 1

// Step 4: 
var dataFile = sqlContext.read.avro("/user/cloudera/problem5/avro");
sqlContext.setConf("spark.sql.parquet.compression.codec","snappy");
dataFile.repartition(1).write.parquet("/user/cloudera/problem5/parquet-snappy-compress");
dataFile.
  map(x=> x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3)).
  saveAsTextFile("/user/cloudera/problem5/text-gzip-compress", classOf[org.apache.hadoop.io.compress.GzipCodec]);
dataFile.
  map(x=> (x(0).toString,x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3))).
  saveAsSequenceFile("/user/cloudera/problem5/sequence");

// Below may fail in some cloudera VMS. 
// If the spark command fails use the sqoop command to accomplish the problem. 
// Remember you need to get out to spark shell to run the sqoop command. 

dataFile.
  map(x=> x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3)).
  saveAsTextFile("/user/cloudera/problem5/text-snappy-compress", classOf[org.apache.hadoop.io.compress.SnappyCodec]);

sqoop import --table orders \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username retail_dba \
  --password cloudera \
  --as-textfile \
  -m1 \
  --target-dir user/cloudera/problem5/text-snappy-compress \
  --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec

// Step 5: 
var parquetDataFile = sqlContext.read.parquet("/user/cloudera/problem5/parquet-snappy-compress")
sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed");
parquetDataFile.write.parquet("/user/cloudera/problem5/parquet-no-compress");
sqlContext.setConf("spark.sql.avro.compression.codec","snappy");

parquetDataFile.write.avro("/user/cloudera/problem5/avro-snappy");

// Step 6: 
var avroData = sqlContext.read.avro("/user/cloudera/problem5/avro-snappy");
avroData.toJSON.saveAsTextFile("/user/cloudera/problem5/json-no-compress");
avroData.toJSON.saveAsTextFile("/user/cloudera/problem5/json-gzip", classOf[org.apache.hadoop.io.GzipCodec]);

// Step 7: 
var jsonData = sqlContext.read.json("/user/cloudera/problem5/json-gzip");
jsonData.
  map(x=>x(0)+","+x(1)+","+x(2)+","+x(3)).
  saveAsTextFile("/user/cloudera/problem5/csv-gzip", classOf[org.apache.hadoop.io.compress.GzipCodec])

// Step 8: 
//To read the sequence file you need to understand the sequence getter for the key and value class to //be used while loading the sequence file as a spark RDD.
//In a new terminal Get the Sequence file to local file system
hadoop fs -get /user/cloudera/problem5/sequence/part-00000
//read the first 300 characters to understand the two classes to be used. 
cut -c-300 part-00000

//In spark shell do below
var seqData = sc.
  sequenceFile("/user/cloudera/problem5/sequence/", classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]);
seqData.
  map(x => {
    var d = x._2.toString.split("\t"); (d(0),d(1),d(2),d(3))
  }).
  toDF().
  write.
  orc("/user/cloudera/problem5/orc");

// Contribution from Raphael L. Nascimento: 
sqlContext.sql("SET spark.sql.parquet.compression.codec=snappy")