searsaw
searsaw

Reputation: 3632

How to forEach over a whole stream after a set interval in RxJS

I have a stream of zip codes that I want to iterate over after a set interval. I'm trying to update the temperature that is shown on the page by sending the zip codes and getting the data back from the API. I have to send one at a time. So I need to be able to get all the distinct zip codes after a certain interval and iterate over the whole stream. Then I want to update the temperature on the page.

// Get stream of zip codes
const zipcodeStream =
  Rx.Observable
    .fromEvent(zipcodeInput, 'input')
    .map(e => e.target.value)
    .filter(zip => zip.length === 5);

// Create a timer to refresh the data
Rx.Observable
  .interval(5000)
  .zip(zipcodeStream, ([i, zip]) => zip)
  .forEach((...args) => {
    console.log('interval forEach args', ...args);
  });

This only sends a single zip code when a new zip code is entered and the interval has passed. I want access to them all to iterate over.

Upvotes: 1

Views: 482

Answers (1)

int3h
int3h

Reputation: 461

Since you want to save all the item emitted by zipcodeStream so you can iterate over them every 5 seconds, you'll need to use a ReplaySubject. These save all items emitted, and replay them whenever an Observer subscribes.

In contrast, your current zipcodeStream Observable is a "hot" observable. This means that it starts emitting items as soon as it's created, and any subsequent subscribers will 'miss' any items emitted before they subscribed.

const zipcodeStream =
  Rx.Observable
    .fromEvent(zipcodeInput, 'input')
    .map(e => e.target.value)
    .filter(zip => zip.length === 5);

const zipcodeSubject = new Rx.ReplaySubject();
const zipcodeDisposable = zipcodeStream.subscribe(zipcodeSubject);

Rx.Observable
  .interval(5000)
  // Every 5000ms, will emit all items from zipcodeSubject
  .flatMapLatest(() => zipcodeSubject)
  .forEach((...args) => {
    console.log('interval forEach args', ...args);
  });

Upvotes: 1

Related Questions