Reputation: 1132
I'm looking for a way to implement/use Fan-out which takes 1 input, and broadcast to N outputs parallel, the difference is that i want to partition them.
Example: 1 input can emit to 4 different outputs, and other input can emit to 2 others outputs, depends on some function f
source ~> partitionWithBroadcast // Outputs to some subset of [0,3] outputs
partitionWithBroadcast(0) ~> ...
partitionWithBroadcast(1) ~> ...
partitionWithBroadcast(2) ~> ...
partitionWithBroadcast(3) ~> ...
I was searching in the Akka documentation but couldn't found any flow which can be suitable
any ideas?
Upvotes: 0
Views: 328
Reputation: 40296
What comes to mind is a FanOutShape
with filters attached to each output. NOTE: I am not using the standard Partition
operator because it emits to just 1 output. The question asks to emit to any of the connected outputs. E.g.:
def createPartial[E](partitioner: E => Set[Int]) = {
GraphDSL.create[FanOutShape4[E,E,E,E,E]]() { implicit builder =>
import GraphDSL.Implicits._
val flow = builder.add(Flow.fromFunction((e: E) => (e, partitioner(e))))
val broadcast = builder.add(Broadcast[(E, Set[Int])](4))
val flow0 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(0)).map(_._1))
val flow1 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(1)).map(_._1))
val flow2 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(2)).map(_._1))
val flow3 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(3)).map(_._1))
flow.out ~> broadcast.in
broadcast.out(0) ~> flow0.in
broadcast.out(1) ~> flow1.in
broadcast.out(2) ~> flow2.in
broadcast.out(3) ~> flow3.in
new FanOutShape4[E,E,E,E,E](flow.in, flow0.out, flow1.out, flow2.out, flow3.out)
}
}
The partitioner
is a function that maps an element from upstream to a tuple having that element and a set of integers that will activate the corresponding output. The graph calculates the desired partitions, then broadcasts the tuple. A flow attached to each of the outputs of the Broadcast
selects elements that the partitioner assigned to that output.
Then use it e.g. as:
implicit val system: ActorSystem = ActorSystem()
implicit val ec = system.dispatcher
def partitioner(s: String) = (0 to 3).filter(s(_) == '*').toSet
val src = Source(immutable.Seq("*__*", "**__", "__**", "_*__"))
val sink0 = Sink.seq[String]
val sink1 = Sink.seq[String]
val sink2 = Sink.seq[String]
val sink3 = Sink.seq[String]
def toFutureTuple[X](f0: Future[X], f1: Future[X], f2: Future[X], f3: Future[X]) = f0.zip(f1).zip(f2).map(t => (t._1._1,t._1._2,t._2)).zip(f3).map(t => (t._1._1,t._1._2,t._1._3,t._2))
val g = RunnableGraph.fromGraph(GraphDSL.create(src, sink0, sink1, sink2, sink3)((_,f0,f1,f2,f3) => toFutureTuple(f0,f1,f2,f3)) { implicit builder =>
(in, o0, o1, o2, o3) => {
import GraphDSL.Implicits._
val part = builder.add(createPartial(partitioner))
in ~> part.in
part.out0 ~> o0
part.out1 ~> o1
part.out2 ~> o2
part.out3 ~> o3
ClosedShape
}
})
val result = Await.result(g.run(), 10.seconds)
println("0: " + result._1.mkString(" "))
println("1: " + result._2.mkString(" "))
println("2: " + result._3.mkString(" "))
println("3: " + result._4.mkString(" "))
// Prints:
//
// 0: *__* **__
// 1: **__ _*__
// 2: __**
// 3: *__* __**
Upvotes: 1
Reputation: 4123
First, implement your function to create the Partition:
def partitionFunction4[A](func: A => Int)(implicit builder: GraphDSL.Builder[NotUsed]) = {
// partition with 4 output ports
builder.add(Partition[A](4, inputElement => func(inputElement)))
}
then, create another function to create a Sink with a log function that is going to be used to print in the console the element:
def stream[A](log: A => Unit) = Flow.fromFunction[A, A](el => {
log(el)
el
} ).to(Sink.ignore)
Connect all the elements in the *graph function:
def graph[A](src: Source[A, NotUsed])
(func4: A => Int, log: Int => A => Unit) = {
RunnableGraph
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition4 = partitionFunction4(func4)
/** Four sinks **/
val flowSet0 = (0 to 4).map(in => log(in))
src ~> partition4.in
partition4.out(0) ~> stream(flowSet0(0))
partition4.out(1) ~> stream(flowSet0(1))
partition4.out(2) ~> stream(flowSet0(2))
partition4.out(3) ~> stream(flowSet0(3))
ClosedShape
})
.run()
}
Create a Source that emits five Int elements. The function to create the partition is "element % 4". Depending on the result of this function the element will be redirected to the specific source:
val source1: Source[Int, NotUsed] = Source(0 to 4)
graph[Int](source1)(f1 => f1 % 4,
in => {
el =>
println(s"Stream ${in} element ${el}")
})
Obtaining as result:
Stream 0 element 0
Stream 1 element 1
Stream 2 element 2
Stream 3 element 3
Stream 0 element 4
Upvotes: 1