Reputation: 251
Looking for Rxjava operator to merge sources into one stream currently have this
Disposable observable = Observable.concat(
service.loadPopCells().toObservable(),
service.loadChallangeData().toObservable(),
service.loadUserCell().toObservable()
)
.subscribe(data->sendtoViewmodel(data)); // called 3 times
i have 3 stream , so on subscribe is called three time, but i want it to be called once with all data
looking to get something like this
Disposable observable = Observable.concat(
service.loadPopCells().toObservable(),
service.loadChallangeData().toObservable(),
service.loadUserCell().toObservable()
)
// i want to achieve something like this
.mapallresult(data,data2,data3){
private List<SimpleCell> shots = new ArrayList<>();
shots.add(data);
shots.add(data2);
shots.add(data2);
return shots; }
///
.subscribe(dataList->sendtoViewmodel(dataList); // called once
Upvotes: 2
Views: 462
Reputation: 13471
I think what you need is use the zip operator, to run all observable operation in sync or async and then get the results together
Check this examples
/**
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
*/
@Test
public void testZip() {
long start = System.currentTimeMillis();
Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Sync in:", start, result));
}
public void showResult(String transactionType, long start, String result) {
System.out.println(result + " " +
transactionType + String.valueOf(System.currentTimeMillis() - start));
}
public Observable<String> obString() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obString1() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obString2() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
Or the same example async
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
private Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
Upvotes: 1
Reputation: 2276
zip
operator will help you:
Observable.zip(
service.loadPopCells().toObservable(),
service.loadChallangeData().toObservable(),
service.loadUserCell().toObservable(),
(data1, data2, data3) -> Arrays.asList(data1, data2, data3))
.subscribe(dataList -> sendtoViewmodel(dataList));
}
Or even shorter:
Observable.zip(
service.loadPopCells().toObservable(),
service.loadChallangeData().toObservable(),
service.loadUserCell().toObservable(),
Arrays::asList)
.subscribe(this::sendtoViewmodel);
Upvotes: 2