Reputation: 4600
I have a stream of promises coming in and I don't want them to start until all the previous promises have resolved but while making use of Observables.
I'd try something like this:
let queue = Rx.Observable.create();
function addToQueue() {
// How do I add this to the queue.
}
function removeFromQueue() {
// How do i remove this item from the queue.
}
function getNewObservable() {
const queueItem = Rx.Observable.fromPromise(new Promise(resolve => {
setTimeout(() => {
removeFromQueue();
resolve();
}, 1000);
}));
addToQueue();
return queue;
}
getNewObservable().then(() => console.log('After 1000'));
getNewObservable().then(() => console.log('After 2000'));
// This one shouldn't run until the top two are finished
getNewObservable().then(() => console.log('After 3000'));
// This one would just wait for the previous one to resolve since
// the first two already resolved
setTimeout(() => {
getNewObservable().then(() => console.log('After 4000'));
}, 2500);
Upvotes: 1
Views: 860
Reputation: 96891
That's just the concatMap
operator. The only question is where you create the Promises:
const queue = Rx.Observable.create();
const results = queue.concatMap(p => createPromise(p));
results.subscribe(console.log);
queue.add(getNewObservable(...));
queue.add(getNewObservable(...));
queue.add(getNewObservable(...));
Upvotes: 1
Reputation: 16837
Use the concat
operator.
import { Component } from '@angular/core';
import { concat } from 'rxjs/observable/concat';
import { fromPromise } from 'rxjs/observable/fromPromise';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
constructor() {
const p1 = Promise.resolve(1);
const p2 = Promise.resolve(2);
const p3 = Promise.resolve(3);
const o1 = fromPromise(p1);
const o2 = fromPromise(p2);
const o3 = fromPromise(p3);
concat(o1, o2, o3)
.subscribe(console.log)
// logs 1, 2, 3
}
}
Upvotes: 0