homam
homam

Reputation: 1975

RxJS equivalent of Async.js mapLimit

Async.js mapLimit and its family of <name>Limit functions basically work like a semaphore: they allow a limited number of tasks to run concurrently while the extra incoming tasks are added to a queue. The queue becomes a (cold? connected?) producer. The task runner drains an item from the queue as soon as a spot becomes available (one of its tasks finishes).

This way a limited number of concurrent tasks are always active.

How can I achieve a similar functionality in RxJS?

Upvotes: 5

Views: 419

Answers (1)

Brandon
Brandon

Reputation: 39182

A combination of defer and flatMapWithMaxConcurrent is the RxJs way to do it:

// returns a promise
function runSomeJob(input) { ... }

function runSomeJobObservable(input) {
    return Rx.Observable.defer(function () {
        return runSomeJob(input);
    });
}

var inputStream = // some Rx.Observable

// only allow 5 jobs to run concurrently
var outputStream = inputStream
    .flatMapWithMaxConcurrent(5, runSomeJobObservable);

ouputStream.subscribe(...);

Upvotes: 4

Related Questions