Reputation: 1924
Let's assume that you want to store events from some stream into the database and a client library to that database is asynchronous.
E.g. there is a method writeEvent(event: MyEvent): Future[Boolean]
that you have to call inside onNext
of your observer when the next event is emitted. Is there a good way to do this otherwise than blocking on the Future
?
The only way that I currently see on how to implement this, is to create some custom Scheduler
that allows me to return thread to the pool, until async code inside onNext
is complete.
Upvotes: 2
Views: 417
Reputation: 2281
You do not want to block like that inside your onNext
subscriber callback, that would defeat Rx. It can be chained together in a more idiomatic way.
I don't work with Futures much, but I wonder if Observable.from(Future)
would work:
someStream
.flatMap(evt => Observable.from(writeEvent(evt)))
.subscribe(
next => ...,
err => ...
)
Upvotes: 2