Reputation: 4795
Say that I have two sources:
val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])
I'd like to create a Graph[...]
processing step in Akka Stream that based on the current value of the ticks
streams it consumes as much as possible in the values stream. So for instance, when values match I want to return all the elements that match in the second source, otherwise keep ticking resulting in an output like:
(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)
How would you implement this behaviour?
Upvotes: 6
Views: 316
Reputation: 2542
I'd recommend you take a look at the Akka Stream Documentation on this subject: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html
According to the site, you can implement a GraphStage like this:
final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape(in, out)
}
There is also a blog post on this subject: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/
Hope this helps :)
Upvotes: 1