Small Spark how-to
val conf = new SparkConf().setMaster("local[*]").setAppName("My app")
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
val PATH = """C:\tmp\js"""
// total of js lines
implicit def isJs(name: String): Boolean = name.endsWith(".js")
println(sc.wholeTextFiles(PATH).filter(_._1).aggregate(0)(_ + _._2.length, _ + _))
// files per extension
sc.wholeTextFiles(PATH).keys.groupBy(FilenameUtils.getExtension).map(f => (f._1, f._2.size)).foreach(println)
// parse every line into a Log case class
case class Log(@JsonProperty("type") `type`: String, @JsonProperty("message") message: String, @JsonProperty("level") level: String)
val mapper = new ObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val rdd: RDD[String] = sc.textFile("""C:\temp\js.log""")
val rdd2: RDD[Log] = rdd.map(mapper.readValue(_, classOf[Log]))
val rdd3: RDD[(String, Iterable[Log])] = rdd2.groupBy(_.level)
val rdd4 = rdd3.map(t => (t._1, t._2.size))
rdd4.foreach(println)