This code allows parallel loading of data from S3 to Spark RDD. Support multiple paths to load from. Based on http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
val s3Paths = "s3://yourbucket/path/to/file1.txt,s3://yourbucket/path/to/directory"
val pageLength = 100
val key = "YOURKEY"
val secret = "YOUR_SECRET"
import com.amazonaws.services.s3._, model._
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.model.ObjectListing
import scala.collection.JavaConverters._
import scala.io.Source
import java.io.InputStream
import org.apache.spark.rdd.RDD
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))
var inputLinesRDD_raw:RDD[String] = sc.emptyRDD[String]
s3Paths.split(",").foreach{ s3Path =>
val regex = """(?i)^s3://([^/]+)/(.*)""".r
val bucket = regex.findFirstMatchIn(s3Path).map(_ group 1).getOrElse(null)
val prefix = regex.findFirstMatchIn(s3Path).map(_ group 2).getOrElse(null)
println("Processing s3 resource: bucket '%s', prefix '%s'".format(bucket, prefix))
@transient val request = new ListObjectsRequest()
request.setBucketName(bucket)
request.setPrefix(prefix)
request.setMaxKeys(pageLength)
@transient var listing = s3.listObjects(request)
var proceed = true
while (proceed){
if (listing.getObjectSummaries.isEmpty){
proceed = false
}else{
@transient val s3FileKeys = listing.getObjectSummaries.asScala.map(_.getKey).toList
val inputLines = sc.parallelize(s3FileKeys).flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
inputLinesRDD_raw = inputLinesRDD_raw.union(inputLines)
listing = s3.listNextBatchOfObjects(listing)
}
}
}
// TODO do something with inputLinesRDD_raw