s4553711
8/28/2015 - 7:28 AM

This code allows parallel loading of data from S3 to Spark RDD. Support multiple paths to load from. Based on http://tech.kinja.com/how-not-

This code allows parallel loading of data from S3 to Spark RDD. Support multiple paths to load from. Based on http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

val s3Paths = "s3://yourbucket/path/to/file1.txt,s3://yourbucket/path/to/directory"
val pageLength = 100
val key = "YOURKEY"
val secret = "YOUR_SECRET"

import com.amazonaws.services.s3._, model._
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.model.ObjectListing
import scala.collection.JavaConverters._
import scala.io.Source
import java.io.InputStream
import org.apache.spark.rdd.RDD

def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))
var inputLinesRDD_raw:RDD[String] = sc.emptyRDD[String]
s3Paths.split(",").foreach{ s3Path =>
  val regex = """(?i)^s3://([^/]+)/(.*)""".r
  val bucket = regex.findFirstMatchIn(s3Path).map(_ group 1).getOrElse(null)
  val prefix = regex.findFirstMatchIn(s3Path).map(_ group 2).getOrElse(null)
  println("Processing s3 resource: bucket '%s', prefix '%s'".format(bucket, prefix))
  @transient val request = new ListObjectsRequest()
  request.setBucketName(bucket)
  request.setPrefix(prefix)
  request.setMaxKeys(pageLength)
  @transient var listing = s3.listObjects(request)
  var proceed = true
  while (proceed){
    if (listing.getObjectSummaries.isEmpty){
      proceed = false
    }else{
      @transient val s3FileKeys = listing.getObjectSummaries.asScala.map(_.getKey).toList
      val inputLines = sc.parallelize(s3FileKeys).flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
      inputLinesRDD_raw = inputLinesRDD_raw.union(inputLines)      
      listing = s3.listNextBatchOfObjects(listing)
    }
  }
}

// TODO do something with inputLinesRDD_raw