Ryan Worsley
Ryan Worsley

Reputation: 655

Akka Streams - combine latest operation

I'd like to combine-latest with Akka Streams as described here.

I can't figure out how to do it - please help!

Thanks, Ryan.

Upvotes: 0

Views: 343

Answers (1)

tg44
tg44

Reputation: 850

I just implemented it quickly. Not sure if its bugless, but worth a try :) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 Comments under the gist is welcomed!

As we spoke on the gitter channel, it can't achieved with build in stages, but you can write the functionality with a custom stage. You will need two input and one output (can be extend to N input), so it's a fan in shape.

I save the incoming elements to options, and whenever an input is ready (aka. send an element) I save the given element to the option. Whenever the output need an element (and we already have one element from both inputs) I give it the values from the options as a tupple. This is the backpressure aware approach.

For a backpressured approach (where you produce all the pairs) you need to handle the waiting for "other" output element then the last one, and need to handle the input pulls. I think my implementation still not handle the too fast producers with slow consumer case (we can miss one element, can be handle with emits), and can deadlocked if both input produces the same element multiple times (maybe emits can handle this too).

If you want to extend my code functionality or want to write other custom stages read this: http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html

Upvotes: 2

Related Questions