Reputation: 2861
I'm new to RxJS and so still learning how to use the library. The documentation for concatMap
gives the following warning:
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
This is a problem for me because I have a memory intensive but fast concatMap feeding a slow concatMap. It's set up like this:
let uploadObs = Observable.range(0, blockCount).concatMap(blockIndex => {
// This part is fast and memory intensive. I'd like to use
// a bounded buffer here or something similar to control
// memory utilization
let blockReaderObs = ...;
// ... read a block from a large file object in blockReaderObs
return blockReaderObs;
}).concatMap((blockData, index) => {
// This part involves a POST so is much slower than reading a
// file block
let objFromBlockData = someTransformation(blockData);
return this.http.post(someUrl, objFromBlockData)
.map(transformResponse);
});
What is the right approach to dealing with this kind of problem in RxJS?
Upvotes: 2
Views: 824
Reputation: 357
I had a similar issue using concatMap and concatAll. I realized that I was creating way too many observables that were waiting to be subscribed to.
This answer in a separate SO discussion was helpful in writing my own solution to limit how much I would put into a concatMap:
https://stackoverflow.com/a/40845089/181961
Upvotes: 1
Reputation: 12667
This is a classic producer-consumer problem. You can use backpressure operators to limit the number of elements being sent in for processing. See controlled streams.
Upvotes: 2