Keith
Keith

Reputation: 2861

Memory management with RxJS Observable.concatMap?

I'm new to RxJS and so still learning how to use the library. The documentation for concatMap gives the following warning:

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

This is a problem for me because I have a memory intensive but fast concatMap feeding a slow concatMap. It's set up like this:

let uploadObs = Observable.range(0, blockCount).concatMap(blockIndex => {
    // This part is fast and memory intensive. I'd like to use
    // a bounded buffer here or something similar to control
    // memory utilization

    let blockReaderObs = ...;
    // ... read a block from a large file object in blockReaderObs
    return blockReaderObs;
}).concatMap((blockData, index) => {
    // This part involves a POST so is much slower than reading a
    // file block
    let objFromBlockData = someTransformation(blockData);
    return this.http.post(someUrl, objFromBlockData)
        .map(transformResponse);
});

What is the right approach to dealing with this kind of problem in RxJS?

Upvotes: 2

Views: 824

Answers (2)

ckovacs
ckovacs

Reputation: 357

I had a similar issue using concatMap and concatAll. I realized that I was creating way too many observables that were waiting to be subscribed to.

This answer in a separate SO discussion was helpful in writing my own solution to limit how much I would put into a concatMap:

https://stackoverflow.com/a/40845089/181961

Upvotes: 1

Asti
Asti

Reputation: 12667

This is a classic producer-consumer problem. You can use backpressure operators to limit the number of elements being sent in for processing. See controlled streams.

Upvotes: 2

Related Questions