Spark custom data merge
premiere|8|ligne|5|troisieme|9|quatrieme|9
hello there|11|i'm here|8|or am i not?|12|i forgot|8
premiere
ligne
troisieme
quatrieme
fifth?
object Main extends App {
val config = new SparkConf().setAppName("std test")
val context = new SparkContext(config)
val sqlContext = new SQLContext(context)
val df = sqlContext.read.format("com.powerspace.test").load("/user/sderosiaux/sparkdata")
// debug
df.registerTempTable("users")
sqlContext.sql("select * from users").show()
// write the df into a file
df.write.format("com.powerspace.test").save("/user/sderosiaux/sparkdataresults")
}
class DefaultSource extends RelationProvider with SchemaRelationProvider with Logging with CreatableRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
logInfo("createRelation of RelationProvider")
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
logInfo(s"createRelation of CreatableRelationProvider ($parameters)")
data.rdd.map { _.toSeq.map(_.toString).mkString("|") } saveAsTextFile parameters.getOrElse("path", "")
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
logInfo("createRelation of SchemaRelationProvider")
val location = parameters.getOrElse("path", "")
new CustomRelation(location, sqlContext)
}
}
class CustomRelation(val location: String, @transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan with Logging {
def schema: StructType = StructType(Seq(
StructField("name", StringType, nullable = true),
StructField("namel", IntegerType, nullable = true),
StructField("name2", StringType, nullable = true),
StructField("name2l", IntegerType, nullable = true),
StructField("name3", StringType, nullable = true),
StructField("name3l", IntegerType, nullable = true),
StructField("name4", StringType, nullable = true),
StructField("name4l", IntegerType, nullable = true)
))
def buildScan(): RDD[Row] = {
val rdd = sqlContext.sparkContext.wholeTextFiles(location).map { case (path, content) => content }
rdd.map(file => {
val lines = file.split("\n")
Row.fromSeq(lines.zip(lines.map(_.length)).flatMap(t => List(t._1, t._2)))
})
}
}