Pengin
Pengin

Reputation: 4772

Akka stream batching

Learning Akka Streams. I have a stream of records, many per time unit, already ordered by time (from Slick), and I want to batch them into time groups for processing by detecting when the time step changes.

Example

case class Record(time: Int, payload: String)

If the incoming stream is

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

I would like to transform this into

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

So far I've only found grouping by a fixed number of records, or splitting into many substreams, but from my perspective I don't need multiple substreams.

Update: I found batch, but it looks more concerned with backpressure than just batching all the time.

Upvotes: 4

Views: 1298

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19507

statefulMapConcat is the multitool in the Akka Streams library.

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

Running the above prints the following:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

You can adjust the example to print Batch objects.

Upvotes: 5

Related Questions