Alexander
Alexander

Reputation: 65

Sending element to second sink if only first sink is successful

I want to do following using akka streams..I tried broadcast ,and alsoTo but it didn't solve my problem

Source -> DBSink -> APISink.

I first want to send element to DB, when transaction successfully completed,I want to send it to APISink.. if transaction fails in first Sink,I don't want to send it to API..

Upvotes: 0

Views: 412

Answers (2)

scot
scot

Reputation: 1280

You are going to have to use a Flow for the database operation. The database flow should return some sort of composite object (e.g. a Pair, or a Try) that contain the data needed for the next step and the result of the database operation.

If the “unsuccessful” database operation means it throws an exception, you will definitely have to use a Try. Because if the database flow throws an exception, the stream will stop. If the database operation being “unsuccessful” is a negative result (e.g. not enough credit for operation, product code doesn’t exist in the db, something like that), then you can use a Pair which contains both the database result (maybe just a Boolean) and the data payload. If it’s both you will have to use a Try which contains a Pair so you can intercept both the exception cases and the “computer says no” business logic cases.

However the key here, is this is done with a Flow, not a Sink. A Sink consumes the data. A flow will both consume and return data, which is what you want.

On the result of the database Flow, you put a filter() or a filterNot() on the stream to remove the unsuccessful results, and after that pass it onto to downstream Sink (or another Flow) that completes the API call.

Upvotes: 1

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

It's not entirely clear what you want to achieve.

You can send elements from a Source to multiple sinks using alsoTo.

Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.

val sink1 = Sink.foreach[Int](_ => "got it at sink1")
val sink2 = Sink.foreach[Int](_ => "got it at sink2")

Source(List(1, 2, 3))
  .alsoTo(sink1)
  .to(sink2)

The elements will be sent to both sinks only when both of them emit "demand". This behaves in the same way as Broadcast.

In case of an error, ie exception is being thrown in the stream processing, both sinks will be notified with an error and stream stops by default. It's matter of supervision strategy to say if stream needs stopping, restarting or resuming.

.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))

Upvotes: 2

Related Questions