Reputation: 4772
Learning Akka Streams. I have a stream of records, many per time unit, already ordered by time (from Slick), and I want to batch them into time groups for processing by detecting when the time step changes.
Example
case class Record(time: Int, payload: String)
If the incoming stream is
Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...
I would like to transform this into
Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...
So far I've only found grouping by a fixed number of records, or splitting into many substreams, but from my perspective I don't need multiple substreams.
Update: I found batch
, but it looks more concerned with backpressure than just batching all the time.
Upvotes: 4
Views: 1298
Reputation: 19507
statefulMapConcat
is the multitool in the Akka Streams library.
val records =
Source(List(
Record(1, "a"),
Record(1, "k"),
Record(1, "k"),
Record(1, "a"),
Record(2, "r"),
Record(2, "o"),
Record(2, "c"),
Record(2, "k"),
Record(2, "s"),
Record(3, "!")
))
.concat(Source.single(Record(0, "notused"))) // needed to print the last element
records
.statefulMapConcat { () =>
var currentTime = 0
var payloads: Seq[String] = Nil
record =>
if (record.time == currentTime) {
payloads = payloads :+ record.payload
Nil
} else {
val previousState = (currentTime, payloads)
currentTime = record.time
payloads = Seq(record.payload)
List(previousState)
}
}
.runForeach(println)
Running the above prints the following:
(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))
You can adjust the example to print Batch
objects.
Upvotes: 5