melbic
melbic

Reputation: 12217

Accumulate Observables from different Streams for polling

I'm trying to accumulate Observables from server calls, for then flatmapping them and make another server call.

private Observable poll(Observable<TaskStatus> taskObservable) {
     Observable.add(taskObservable) //Pseudocode
    .buffer(3 sec) 
    .flatMap(...)
    ...
}

How can the Observable accumulation ("add") be achieved?

Upvotes: 0

Views: 483

Answers (3)

david-hoze
david-hoze

Reputation: 1075

You should use the window operator (See more information here, search for the version with window(source, timespan, unit)).

Your code should be something like this:

Observable.window(3, TimeUnit.SECONDS).flatmap(...)

Upvotes: 0

Chris Melinn
Chris Melinn

Reputation: 2076

For accumulation then, I think you might be after something like the CompositeDisposable that is part of Rx.NET.

Sample Rx.NET usage: http://rxwiki.wikidot.com/disposables#toc2

Documentation for Rx.NET class : http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx

I believe the Rx Java equivalent is the CompositeSubscription class.

See here: http://netflix.github.io/RxJava/javadoc/rx/subscriptions/CompositeSubscription.html

Upvotes: 0

Chris Melinn
Chris Melinn

Reputation: 2076

You're probably looking for the merge() operator.

For more information on combining Observables, see this: https://github.com/Netflix/RxJava/wiki/Combining-Observables

Upvotes: 1

Related Questions