bpg
12/9/2017 - 12:56 AM

A complete server using Akka streams that reads some source, batches its data and then publishes. If the data cannot be published then it ba

A complete server using Akka streams that reads some source, batches its data and then publishes. If the data cannot be published then it backs off with a best-effort of sending that data again.

val (recycleQueue, recycleSource) =
  Source
    .queue[SoilStateReading](100, OverflowStrategy.dropTail)
    .prefixAndTail(0)
    .map(_._2)
    .toMat(Sink.head)(Keep.both)
    .run()
StreamConverters.fromInputStream(() => this.getClass.getClassLoader.getResourceAsStream("sensors.log"))
  .via(SoilStateReading.csvParser)
  .merge(Source.fromFutureSource(recycleSource))
  .batch(100, e => List(e))((a, e) => e +: a)
  .via(RestartFlow.withBackoff(1.second, 3.seconds, 0.2) { () =>
    Flow[Seq[SoilStateReading]]
      .mapAsync(1) { readings =>
        responder
          .post(readings)
          .recover {
            case e: IllegalStateException =>
              readings.foreach(recycleQueue.offer)
              throw e
          }
      }
  })
  .runWith(Sink.ignore)