chtefi
5/27/2016 - 11:15 PM

Small Spark how-to

Small Spark how-to

  val conf = new SparkConf().setMaster("local[*]").setAppName("My app")
  val sc = new SparkContext(conf)
  sc.hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
  val PATH = """C:\tmp\js"""
  // total of js lines
  implicit def isJs(name: String): Boolean = name.endsWith(".js")
  println(sc.wholeTextFiles(PATH).filter(_._1).aggregate(0)(_ + _._2.length, _ + _))
  // files per extension
  sc.wholeTextFiles(PATH).keys.groupBy(FilenameUtils.getExtension).map(f => (f._1, f._2.size)).foreach(println)

  // parse every line into a Log case class
  case class Log(@JsonProperty("type") `type`: String, @JsonProperty("message") message: String, @JsonProperty("level") level: String)
  val mapper = new ObjectMapper
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  val rdd: RDD[String] = sc.textFile("""C:\temp\js.log""")
  val rdd2: RDD[Log] = rdd.map(mapper.readValue(_, classOf[Log]))
  val rdd3: RDD[(String, Iterable[Log])] = rdd2.groupBy(_.level)
  val rdd4 = rdd3.map(t => (t._1, t._2.size))
  rdd4.foreach(println)