John
John

Reputation: 7049

RX: Synchronizing promises

Let's say I have a rather typical use of rx that does requests every time some change event comes in (I write this in the .NET style, but I'm really thinking of Javascript):

myChanges
    .Throttle(200)
    .Select(async data => {
        await someLongRunningWriteRequest(data);
    })

If the request takes longer than 200ms, there's a chance a new request begins before the old one is done - potentially even that the new request is completed first.

How to synchronize this?

Note that this has nothing to do with multithreading, and that's the only thing I could find information about when googling for "rx synchronization" or something similar.

Upvotes: 0

Views: 45

Answers (1)

Oles Savluk
Oles Savluk

Reputation: 4345

You could use concatMap operator which will start working on the next item only after previous was completed.

Here is an example where events$ appear with the interval of 200ms and then processed successively with a different duration:

const { Observable } = Rx;

const fakeWriteRequest = data => {
  console.log('started working on: ', data);
  return Observable.of(data).delay(Math.random() * 2000);
}

const events$ = Observable.interval(200);

events$.take(10)
  .concatMap(i => fakeWriteRequest(i))
  .subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Upvotes: 1

Related Questions