Humble Student
Humble Student

Reputation: 3955

Why is initial stream being triggered again after combineLatest and merge in this example?

Look the excerpt below:

  let requestStream = Rx.Observable
    .of(`${GITHUB_API}?since=${randomNumber()}`)
    .mergeMap(url => {
      console.log(`performing request to: ${url}`)
      return Rx.Observable.from(jQuery.getJSON(url))
    });

  let refreshStream = Rx.Observable.fromEvent(refreshButton, 'click')
    .startWith('click')
    .do(_ => users.empty())
    .combineLatest(requestStream, (_, users) => users.slice(randomNumber(users.length)));

  let randomUserStream = userRemovedStream
    .combineLatest(requestStream, (_, users) => users[randomNumber(users.length)]);

  requestStream
    .merge(refreshStream)
    .flatMap(users => users)
    .merge(randomUserStream)
    .filter(_ => users.children().length < MAX_SUGGESTIONS)
    .do(user => users.append(createItem(user)))
    .mergeMap(user => Rx.Observable.fromEvent($(`#close-${user.login}`), 'click'))
    .map(event => event.target.parentNode)
    .subscribe(user => {
      user.remove();
      userRemovedStream.next('');
    });

The requestStream returns an array with 100 users, however, I am consuming only three (MAX_SUGGESTIONS) of them at the time. refreshStream and randomUserStream exists in order to reuse the other 97 users from requestStream. The problem is, when I run the code above, I still see on console performing request to: ... three times.

I've noticed that this happens after adding the merge methods in the last stream, however, I am not sure why this behaviour is happening.

My understand is: when I merge refreshStream and randomUserStream, whenever a new item is emitted, a click on refresh button for the former and a click on remove button for the later, the previously emitted array on requestStream will be parsed and passed forward instead of the click itself. This should not re-trigger the requestStream.

Can someone help me understand why is this happening and how to deal with this situation? - so I can take the maximum out of the users already returned by the API during the first call?

Upvotes: 0

Views: 34

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

It happens because you have effectively three subscriptions to the requestStream. Your intuition in how the three interact is correct, however, because your requestStream Observable is cold it will create a new stream each time there is a subscription.

It isn't necessarily obvious because only one subscription is explicitly made, but each time you pass requestStream to combineLatest it will end up creating an new subscription which will in turn start a new stream, which in this case calls your underlying API.

If you don't want that to happen, I would suggest you use a multicasting operator like publishLast

So requestStream will become:

let requestStream = Rx.Observable
    .of(`${GITHUB_API}?since=${randomNumber()}`)
    .mergeMap(url => {
      console.log(`performing request to: ${url}`)
      return Rx.Observable.from(jQuery.getJSON(url))
    })
    .publishLast();

In this case requestStream is now actually a ConnectableObservable so you will need to also start it at some point, usually you would wait until all of your subscribers are hooked up.

/* Rest of you example */
.map(event => event.target.parentNode)
.subscribe(user => {
  user.remove();
  userRemovedStream.next('');
});

requestStream.connect();

Upvotes: 1

Related Questions