Dustin Silk
Dustin Silk

Reputation: 4600

Queuing promises with an Observable

I have a stream of promises coming in and I don't want them to start until all the previous promises have resolved but while making use of Observables.

I'd try something like this:

let queue = Rx.Observable.create();

function addToQueue() {
  // How do I add this to the queue.
}

function removeFromQueue() {
  // How do i remove this item from the queue.
}

function getNewObservable() {
  const queueItem = Rx.Observable.fromPromise(new Promise(resolve => {
    setTimeout(() => {
      removeFromQueue();
      resolve();
    }, 1000);
  }));

  addToQueue();
  return queue;
}

getNewObservable().then(() => console.log('After 1000'));
getNewObservable().then(() => console.log('After 2000'));

// This one shouldn't run until the top two are finished
getNewObservable().then(() => console.log('After 3000'));

// This one would just wait for the previous one to resolve since
// the first two already resolved
setTimeout(() => {
  getNewObservable().then(() => console.log('After 4000'));
}, 2500);

Upvotes: 1

Views: 860

Answers (2)

martin
martin

Reputation: 96891

That's just the concatMap operator. The only question is where you create the Promises:

const queue = Rx.Observable.create();
const results = queue.concatMap(p => createPromise(p));

results.subscribe(console.log);

queue.add(getNewObservable(...));
queue.add(getNewObservable(...));
queue.add(getNewObservable(...));

Upvotes: 1

Tomasz Kula
Tomasz Kula

Reputation: 16837

Use the concat operator.

import { Component } from '@angular/core';
import { concat } from 'rxjs/observable/concat';
import { fromPromise } from 'rxjs/observable/fromPromise';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  constructor() {
    const p1 = Promise.resolve(1);
    const p2 = Promise.resolve(2);
    const p3 = Promise.resolve(3);

    const o1 = fromPromise(p1);
    const o2 = fromPromise(p2);
    const o3 = fromPromise(p3);

    concat(o1, o2, o3)
      .subscribe(console.log)
      // logs 1, 2, 3
  }
}

Live demo

Upvotes: 0

Related Questions