Reputation: 7049
Let's say I have a rather typical use of rx that does requests every time some change event comes in (I write this in the .NET style, but I'm really thinking of Javascript):
myChanges
.Throttle(200)
.Select(async data => {
await someLongRunningWriteRequest(data);
})
If the request takes longer than 200ms, there's a chance a new request begins before the old one is done - potentially even that the new request is completed first.
How to synchronize this?
Note that this has nothing to do with multithreading, and that's the only thing I could find information about when googling for "rx synchronization" or something similar.
Upvotes: 0
Views: 45
Reputation: 4345
You could use concatMap operator which will start working on the next item only after previous was completed.
Here is an example where events$
appear with the interval of 200ms and then processed successively with a different duration:
const { Observable } = Rx;
const fakeWriteRequest = data => {
console.log('started working on: ', data);
return Observable.of(data).delay(Math.random() * 2000);
}
const events$ = Observable.interval(200);
events$.take(10)
.concatMap(i => fakeWriteRequest(i))
.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
Upvotes: 1