Magomed Abdurakhmanov
Magomed Abdurakhmanov

Reputation: 1924

RxJava/RxScala async code inside Observer.onNext

Let's assume that you want to store events from some stream into the database and a client library to that database is asynchronous.

E.g. there is a method writeEvent(event: MyEvent): Future[Boolean] that you have to call inside onNext of your observer when the next event is emitted. Is there a good way to do this otherwise than blocking on the Future?

The only way that I currently see on how to implement this, is to create some custom Scheduler that allows me to return thread to the pool, until async code inside onNext is complete.

Upvotes: 2

Views: 417

Answers (1)

drhr
drhr

Reputation: 2281

You do not want to block like that inside your onNext subscriber callback, that would defeat Rx. It can be chained together in a more idiomatic way.

I don't work with Futures much, but I wonder if Observable.from(Future) would work:

someStream
    .flatMap(evt => Observable.from(writeEvent(evt)))
    .subscribe(
        next => ...,
        err => ...
)

Upvotes: 2

Related Questions