Reputation: 91
With previous versions of Akka Streams, groupBy
returned a Source
of Source
s that could be materialized into a Source[Seq[A]]
.
With Akka Streams 2.4 I see that groupBy
returns a SubFlow
- it's not clear to me how use this. The transformations I need to apply to the flow have to have the whole Seq
available, so I can't just map
over the SubFlow
(I think).
I've written a class that extends GraphStage
that does the aggregation via a mutable collection in the GraphStageLogic
, but is there in-built functionality for this? Am I missing the point of SubFlow
?
Upvotes: 3
Views: 597
Reputation: 91
I ended up writing a GraphStage
:
class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] {
val in: Inlet[A] = Inlet("in")
val out: Outlet[Seq[A]] = Outlet("out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter: Option[B] = None
private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A]
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in)
counter.fold({
counter = Some(f(element))
aggregate += element
pull(in)
}) { p =>
if (f(element) == p) {
aggregate += element
pull(in)
} else {
push(out, aggregate)
counter = Some(f(element))
aggregate = scala.collection.mutable.ArrayBuffer(element)
}
}
}
override def onUpstreamFinish(): Unit = {
emit(out, aggregate)
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Upvotes: 0