Reputation: 3632
I have a stream of zip codes that I want to iterate over after a set interval. I'm trying to update the temperature that is shown on the page by sending the zip codes and getting the data back from the API. I have to send one at a time. So I need to be able to get all the distinct zip codes after a certain interval and iterate over the whole stream. Then I want to update the temperature on the page.
// Get stream of zip codes
const zipcodeStream =
Rx.Observable
.fromEvent(zipcodeInput, 'input')
.map(e => e.target.value)
.filter(zip => zip.length === 5);
// Create a timer to refresh the data
Rx.Observable
.interval(5000)
.zip(zipcodeStream, ([i, zip]) => zip)
.forEach((...args) => {
console.log('interval forEach args', ...args);
});
This only sends a single zip code when a new zip code is entered and the interval has passed. I want access to them all to iterate over.
Upvotes: 1
Views: 482
Reputation: 461
Since you want to save all the item emitted by zipcodeStream
so you can iterate over them every 5 seconds, you'll need to use a ReplaySubject. These save all items emitted, and replay them whenever an Observer subscribes.
In contrast, your current zipcodeStream
Observable is a "hot" observable. This means that it starts emitting items as soon as it's created, and any subsequent subscribers will 'miss' any items emitted before they subscribed.
const zipcodeStream =
Rx.Observable
.fromEvent(zipcodeInput, 'input')
.map(e => e.target.value)
.filter(zip => zip.length === 5);
const zipcodeSubject = new Rx.ReplaySubject();
const zipcodeDisposable = zipcodeStream.subscribe(zipcodeSubject);
Rx.Observable
.interval(5000)
// Every 5000ms, will emit all items from zipcodeSubject
.flatMapLatest(() => zipcodeSubject)
.forEach((...args) => {
console.log('interval forEach args', ...args);
});
Upvotes: 1