Reputation: 2811
I have a Flow that connects few FlowShapes and it looks like this:
def mainFlow: Flow[MyGraphElement, MyGraphElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val someClassifier = builder.add(checkSomething) // FlowShape[MyGraphElement,MyGraphElement]
val filteringRouter = builder.add(partitionBySomething) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val mlRouter = builder.add(partitionBySomethinfElse()) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val publishToSnsFlow = builder.add(publishEvidenceToSns()) // FlowShape[MyGraphElement,MyGraphElement]
val updateTaskStatusDoneFlow1 = builder.add(updateTaskStatus()) // FlowShape[MyGraphElement,MyGraphElement]
val updateTaskStatusDoneFlow2 = builder.add(updateTaskStatus())
val updateTaskStatusDoneFlow3 = builder.add(updateTaskStatus())
someClassifier ~> filteringRouter
filteringRouter.out("case1") ~> publishToSnsFlow ~> updateTaskStatusDoneFlow1 ~> merge
filteringRouter.out("case2") ~> someDeciderFlow ~> mlRouter
mlRouter.out("case5") ~> doSomethingFlow ~> updateTaskStatusDoneFlow2 ~> merge
mlRouter.out("case4") ~> doSomethingElseFlow ~> updateTaskStatusDoneFlow3 ~> merge
FlowShape(someClassifier.in, merge.out)
})
my issue is that I need to call the same method 3 times with different names since FlowShap could only use once inside a Flow...or im missing something, can I do this somehow differently to make it look more elegant? I was referring to updateTaskStatusDoneFlow1/2/3
thanks!
Upvotes: 0
Views: 122
Reputation: 1940
How about merge first, then run through the flow?
val someClassifier = builder.add(checkSomething) // FlowShape[MyGraphElement,MyGraphElement]
val filteringRouter = builder.add(partitionBySomething) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val mlRouter = builder.add(partitionBySomethinfElse()) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val publishToSnsFlow = builder.add(publishEvidenceToSns()) // FlowShape[MyGraphElement,MyGraphElement]
val updateTaskStatusDoneFlow = builder.add(updateTaskStatus()) // FlowShape[MyGraphElement,MyGraphElement]
someClassifier ~> filteringRouter
filteringRouter.out("case1") ~> publishToSnsFlow ~> merge
filteringRouter.out("case2") ~> someDeciderFlow ~> mlRouter
mlRouter.out("case5") ~> doSomethingFlow ~> merge
mlRouter.out("case4") ~> doSomethingElseFlow ~> merge
merge.out ~> updateTaskStatusDoneFlow
FlowShape(someClassifier.in, updateTaskStatusDoneFlow.out)
Upvotes: 1