Abu Noman
Abu Noman

Reputation: 455

How to add delay for some items in RxJava

I want to implement a RxChain for the following purpose:

An observable source is emitting two types of data (S, E). Now I want an observable/flowable which will emit all the S immediately but it should emit the latest E after a given delay (10 sec) from the first emission of E if no S comes in the meantime.

enter image description here

Upvotes: 1

Views: 248

Answers (1)

Ana Santana
Ana Santana

Reputation: 96

Instead of having 1 observable source emitting both S and E, you could instead split them into 2 observables, add a throttleLast 10s on the "E stream" and then merge them together

e.g.

Observable<String> sStream = source.filter(x -> x.type == Types.S);
Observable<String> eStream = source.filter(x -> x.type == Types.E).throttleLast(10, TimeUnit.SECONDS);


Observable.merge(sStream, eStream).subscribe(...);

Upvotes: 1

Related Questions