A complete server using Akka streams that reads some source, batches its data and then publishes. If the data cannot be published then it backs off with a best-effort of sending that data again.
val (recycleQueue, recycleSource) =
Source
.queue[SoilStateReading](100, OverflowStrategy.dropTail)
.prefixAndTail(0)
.map(_._2)
.toMat(Sink.head)(Keep.both)
.run()
StreamConverters.fromInputStream(() => this.getClass.getClassLoader.getResourceAsStream("sensors.log"))
.via(SoilStateReading.csvParser)
.merge(Source.fromFutureSource(recycleSource))
.batch(100, e => List(e))((a, e) => e +: a)
.via(RestartFlow.withBackoff(1.second, 3.seconds, 0.2) { () =>
Flow[Seq[SoilStateReading]]
.mapAsync(1) { readings =>
responder
.post(readings)
.recover {
case e: IllegalStateException =>
readings.foreach(recycleQueue.offer)
throw e
}
}
})
.runWith(Sink.ignore)