Knows Not Much
Knows Not Much

Reputation: 31546

Akka Streams how to write a GraphStage with OrElse

I have the following requirement. I am writing a GraphStage which needs to lookup a SQL Table (in a different db) . If that lookup fails, only then it should lookup a second table (in a different db) and if that lookup also fails, then lookup in the 3rd table (in a different table). if all lookups fail, then use a default

I googled and found this

http://doc.akka.io/japi/akka/current/akka/stream/scaladsl/OrElse.html

and also this thead

Alternative flows based on condition for akka stream

But broadcast and partition is not what I am looking for. I don't want to lookup both the tables simultaneously. what I want is that if one flow returns a None, only then the second flow is used to fetch the value.

Right now I have done something like this

val flow = Flow[Foo].map{foo => 
    lookup1(foo.id) orElse lookup2(foo.id) getOrElse default
}

But this makes the flow above very monolithic. it would be nice if I can break the flow above into 3 separate ones, and then connect them via orelse clause in my graphstage.

Upvotes: 1

Views: 283

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

Using flatMapConcat and orElse might help you getting your code more generic in terms of the number of source you want to combine. See example below

  val altFlows: List[Flow[Foo, Option[Bar], NotUsed]] = ???
  val default : Bar = ???

  Flow[Foo].flatMapConcat { foo ⇒
    val altSources = altFlows.map(Source.single(foo).via(_).collect{ case Some(x) ⇒ x })
    val default    = Source.single(default)

    (altSources :+ default).reduce(_ orElse _)
  }

Upvotes: 1

Related Questions