tonicebrian
tonicebrian

Reputation: 4795

How does one control the flow of an Akka Stream based on another stream

Say that I have two sources:

val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])

I'd like to create a Graph[...] processing step in Akka Stream that based on the current value of the ticks streams it consumes as much as possible in the values stream. So for instance, when values match I want to return all the elements that match in the second source, otherwise keep ticking resulting in an output like:

(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)

How would you implement this behaviour?

Upvotes: 6

Views: 316

Answers (1)

GJZ
GJZ

Reputation: 2542

I'd recommend you take a look at the Akka Stream Documentation on this subject: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

According to the site, you can implement a GraphStage like this:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

override def shape = FlowShape(in, out)
}

There is also a blog post on this subject: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

Hope this helps :)

Upvotes: 1

Related Questions