user46772
user46772

Reputation: 251

Looking for Rxjava operator to merge sources into one stream

Looking for Rxjava operator to merge sources into one stream currently have this

  Disposable observable = Observable.concat(
                service.loadPopCells().toObservable(),
                service.loadChallangeData().toObservable(),
                service.loadUserCell().toObservable()
        )              
  .subscribe(data->sendtoViewmodel(data)); // called 3 times

i have 3 stream , so on subscribe is called three time, but i want it to be called once with all data

looking to get something like this

    Disposable observable = Observable.concat(
                service.loadPopCells().toObservable(),
                service.loadChallangeData().toObservable(),
                service.loadUserCell().toObservable()
        )
        // i want to achieve something like this 
        .mapallresult(data,data2,data3){ 
         private List<SimpleCell> shots = new ArrayList<>();
         shots.add(data);
         shots.add(data2);
         shots.add(data2);
         return shots;  }
         ///
         .subscribe(dataList->sendtoViewmodel(dataList); // called once 

Upvotes: 2

Views: 462

Answers (2)

paul
paul

Reputation: 13471

I think what you need is use the zip operator, to run all observable operation in sync or async and then get the results together

Check this examples

  /**
     * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
     */
    @Test
    public void testZip() {
        long start = System.currentTimeMillis();
        Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                .concat(s3))
                .subscribe(result -> showResult("Sync in:", start, result));
    }


    public void showResult(String transactionType, long start, String result) {
        System.out.println(result + " " +
                transactionType + String.valueOf(System.currentTimeMillis() - start));
    }

    public Observable<String> obString() {
        return Observable.just("")
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> "Hello");
    }

    public Observable<String> obString1() {
        return Observable.just("")
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> " World");
    }

    public Observable<String> obString2() {
        return Observable.just("")
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> "!");
    }

Or the same example async

private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

/**
 * Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
 */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
            .concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));
}

private Observable<String> obAsyncString() {
    return Observable.just("")
            .observeOn(scheduler)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "Hello");
}

private Observable<String> obAsyncString1() {
    return Observable.just("")
            .observeOn(scheduler1)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> " World");
}

private Observable<String> obAsyncString2() {
    return Observable.just("")
            .observeOn(scheduler2)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "!");
}

Upvotes: 1

Dmitry Gorkovets
Dmitry Gorkovets

Reputation: 2276

zip operator will help you:

Observable.zip(
        service.loadPopCells().toObservable(),
        service.loadChallangeData().toObservable(),
        service.loadUserCell().toObservable(),
        (data1, data2, data3) -> Arrays.asList(data1, data2, data3))
        .subscribe(dataList -> sendtoViewmodel(dataList));
    }

Or even shorter:

Observable.zip(
    service.loadPopCells().toObservable(),
    service.loadChallangeData().toObservable(),
    service.loadUserCell().toObservable(),
    Arrays::asList)
    .subscribe(this::sendtoViewmodel);

Upvotes: 2

Related Questions