Lavekush
Lavekush

Reputation: 6166

RxJava - fetch every item on the list and emits one-by-one (any order)

I have a method that returns an Observable<List<Long>>, which are ids of some Items. I'd like to go through this list and download every Item using another method that returns Observable<Item>. currently I'm doing it by below code.

    @Override
    public Observable<List<Item>> getResponses(List<Long> requests) {
        return Observable.from(requests).
                flatMap((Func1<Long, Observable<Item>>) restRequest -> getResponseEach(restRequest)).toList();
    }

It's working fine, but it returning all the response in on go, I mean when all download get finish then my onNext() get invoked,

Main Question But alternatively I need to emit every response one-by-one(Any Order) once each item fetched successfully from server, so my onNext should be invoked every-time time individually for each item.

How would I do this using RxJava operators?

Upvotes: 0

Views: 2096

Answers (1)

Sarath Kn
Sarath Kn

Reputation: 2725

You have to remove the toList() operator. The toList() will emit only after all the emissions of the upstream have been completed, and it will collect the results and will emit as a Single<List<YourResultObject>>

You can return the observable returned by the flatMap in your code and you will get the results one by one,

public Observable<Item> getResponses(List<Long> requests) {
    return Observable.fromIterable(requests)
            .flatMap(restRequest -> getResponseEach(restRequest));
 }

Now, your getResponses method will return Observable<Item> instead of Observable<List<Item>>

And you can subscribe to this function as follows

    getResponses(ids)
           .subscribe(new DisposableObserver<Item>() {
               @Override
               public void onNext(Item item) {
                   // each item will be received here one by one
               }

               @Override
               public void onError(Throwable e) {
                   // handle any occured error during the operation
               }

               @Override
               public void onComplete() {
                   // all operations completed
               } 
           });

Upvotes: 2

Related Questions