Reputation: 455
I want to implement a RxChain for the following purpose:
An observable source is emitting two types of data (S, E). Now I want an observable/flowable which will emit all the S immediately but it should emit the latest E after a given delay (10 sec) from the first emission of E if no S comes in the meantime.
Upvotes: 1
Views: 248
Reputation: 96
Instead of having 1 observable source emitting both S and E, you could instead split them into 2 observables, add a throttleLast
10s on the "E stream" and then merge them together
e.g.
Observable<String> sStream = source.filter(x -> x.type == Types.S);
Observable<String> eStream = source.filter(x -> x.type == Types.E).throttleLast(10, TimeUnit.SECONDS);
Observable.merge(sStream, eStream).subscribe(...);
Upvotes: 1