chtefi
4/20/2016 - 11:06 PM

Spark custom data merge

Spark custom data merge

premiere|8|ligne|5|troisieme|9|quatrieme|9
hello there|11|i'm here|8|or am i not?|12|i forgot|8
premiere
ligne
troisieme
quatrieme
fifth?
object Main extends App {
  val config = new SparkConf().setAppName("std test")
  val context = new SparkContext(config)
  val sqlContext = new SQLContext(context)

  val df = sqlContext.read.format("com.powerspace.test").load("/user/sderosiaux/sparkdata")

  // debug
  df.registerTempTable("users")
  sqlContext.sql("select * from users").show()

  // write the df into a file
  df.write.format("com.powerspace.test").save("/user/sderosiaux/sparkdataresults")

}

class DefaultSource extends RelationProvider with SchemaRelationProvider with Logging with CreatableRelationProvider {
  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    logInfo("createRelation of RelationProvider")
    createRelation(sqlContext, parameters, null)
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    logInfo(s"createRelation of CreatableRelationProvider ($parameters)")
    data.rdd.map { _.toSeq.map(_.toString).mkString("|") } saveAsTextFile parameters.getOrElse("path", "")
    createRelation(sqlContext, parameters, null)
  }

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
    logInfo("createRelation of SchemaRelationProvider")
    val location = parameters.getOrElse("path", "")
    new CustomRelation(location, sqlContext)
  }
}
class CustomRelation(val location: String, @transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan with Logging {

  def schema: StructType = StructType(Seq(
    StructField("name", StringType, nullable = true),
    StructField("namel", IntegerType, nullable = true),
    StructField("name2", StringType, nullable = true),
    StructField("name2l", IntegerType, nullable = true),
    StructField("name3", StringType, nullable = true),
    StructField("name3l", IntegerType, nullable = true),
    StructField("name4", StringType, nullable = true),
    StructField("name4l", IntegerType, nullable = true)
  ))

  def buildScan(): RDD[Row] = {
    val rdd = sqlContext.sparkContext.wholeTextFiles(location).map { case (path, content) => content }
    rdd.map(file => {
      val lines = file.split("\n")
      Row.fromSeq(lines.zip(lines.map(_.length)).flatMap(t => List(t._1, t._2)))
    })
  }
}