Selenir
Selenir

Reputation: 718

Convert Observable of List to List of Observables and merge in RxJava

I'm learning Java with Android by creating Hacker News reader app.

What I'm trying to do is:

  1. Send a request to /topstories, return Observable<List<int>>, emit when request finishes.
  2. Map each storyId to Observable<Story>
  3. Merge Observables into one entity, which emits List<Story>, when all requests finishes.

And to the code:

  private Observable<Story> getStoryById(int articleId) {
    BehaviorSubject<Story> subject = BehaviorSubject.create();
    // calls subject.onNext on success
    JsonObjectRequest request = createStoryRequest(articleId, subject);
    requestQueue.add(request);
    return subject;
  }

  public Observable<ArrayList<Story>> getTopStories(int amount) {
    Observable<ArrayList<Integer>> topStoryIds = (storyIdCache == null)
      ? fetchTopIds()
      : Observable.just(storyIdCache);

    return topStoryIds
      .flatMap(id -> getStoryById(id))
      // some magic here
  }

Then we would use this like:

getTopStories(20)
  .subscribe(stories -> ...)

Upvotes: 4

Views: 8448

Answers (2)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

Please have a look at my solution. I changed your interface to return a Single for getStoryById(), because it should only return one value. After that, I created a for each Story a Single request and subscribed to all of them with Single.zip. Zip will execute given lambda, when all Singles are finished. On drawback is, that all requestes will be fired at once. If you do not want this, I will update my post. Please take into considerations that @elmorabea solution will also subscribe to the first 128 elements (BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));), and to the next element when one finishes.

@Test
  void name() {
    Api api = mock(Api.class);

    when(api.getTopStories()).thenReturn(Flowable.just(Arrays.asList(new Story(1), new Story(2))));
    when(api.getStoryById(eq(1))).thenReturn(Single.just(new Story(888)));
    when(api.getStoryById(eq(2))).thenReturn(Single.just(new Story(888)));

    Flowable<List<Story>> listFlowable =
        api.getTopStories()
            .flatMapSingle(
                stories -> {
                  List<Single<Story>> collect =
                      stories
                          .stream()
                          .map(story -> api.getStoryById(story.id))
                          .collect(Collectors.toList());

                  // possibly not the best idea to subscribe to all singles at the same time
                  Single<List<Story>> zip =
                      Single.zip(
                          collect,
                          objects -> {
                            return Arrays.stream(objects)
                                .map(o -> (Story) o)
                                .collect(Collectors.toList());
                          });

                  return zip;
                });

    TestSubscriber<List<Story>> listTestSubscriber =
        listFlowable.test().assertComplete().assertValueCount(1).assertNoErrors();

    List<List<Story>> values = listTestSubscriber.values();

    List<Story> stories = values.get(0);

    assertThat(stories.size()).isEqualTo(2);
    assertThat(stories.get(0).id).isEqualTo(888);
    assertThat(stories.get(1).id).isEqualTo(888);
  }

  interface Api {
    Flowable<List<Story>> getTopStories();

    Single<Story> getStoryById(int id);
  }

  static class Story {
    private final int id;

    Story(int id) {
      this.id = id;
    }
  }

Upvotes: 1

elmorabea
elmorabea

Reputation: 3263

You can try something like that

Observable<List<Integers>> ids = getIdsObservable();
Single<List<Story>> listSingle =
            ids.flatMapIterable(ids -> ids)
            .flatMap(id -> getStoryById(id)).toList();

Then you can subscribe to that Single to get the List<Story>

Upvotes: 6

Related Questions