Marlon
Marlon

Reputation: 2139

RxJS await ajax respose before processing next buffered chunk

I'm using RxJS with a basic Subject object, which receives inputs from various places at unknown times. These inputs need to be stacked up and sent to the server asynchronously - but I need to wait until that ajax request completes before attempting to process the next buffered set of inputs.

flatMapLatest is not suitable in my case as each response has to be handled uniquely.

I could pause the buffer while the ajax request is in process, but that means I would lose inputs, which is unacceptable.

I see the buffer opening object is just an observable, could I somehow combine both a timer observable and a custom observable which I fire when my ajax request completes and when the ajax request is sent I could suspend the timer one?

Sorry I don't have any code to show yet, I'm still at the design stage really.

Is what I want to do feasible or am I going about it incorrectly?

EDIT

This is the code I have so far, and it appears to do what I want - but am still open to improvements.

var bufferedIntervalPauser = new Rx.Subject<boolean>();
var pausableInterval = Rx.Observable.interval(500).pausable(bufferedIntervalPauser);
var purStream = this.updateReqestSubject.asObservable()
    .buffer(pausableInterval)
    .where(b => (b || []).length > 0)

var purSub = purStream.subscribe(
    next => {
        bufferedIntervalPauser.onNext(false); // pause the buffer window

        SomeAjaxMethod(next, {
            success: res => {
                this.HandleResult(res);
            },
            always: () => {
                // when the ajax completes, resume the buffer
                bufferedIntervalPauser.onNext(true);
            }
        });
    },
    err => {
        console.error(err);
    });

// start
bufferedIntervalPauser.onNext(true);

Upvotes: 0

Views: 743

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

You should be able to use just the buffer operator. As @Enigmativity points out, Rx has a serialized protocol, so your handler wont ever run concurrently i.e. it wont be called while it is still running from a previous call. If however your handler itself calls a non-blocking/asnyc method, then there would be nothing blocking Rx to call your handler before some async response is created. In this case you may want to wrap your async call in an observable sequence. Then you can use an operator like concat, to ensure serialization of multiple observable sequences

EDIT: Your issue here is that you are doing async work in your subscription. What you need to do is treat your SomeAjaxMethod as an observable sequence and then just use composition.

//Psuedo code as I dont have a Java IDE at hand
this.updateReqestSubject.asObservable()
   .Buffer(500ms)
   .Select(buffer=>SomeAjaxMethod(buffer).asObservable().Catch(/*Swallow to match example*/))
   .Concat() //Concatenate the IObservable<IObservable<>> into IObservable<>
   .Subscribe(
     res=>this.HandleResult(res),
     err => {
       console.error(err);
     });

Upvotes: 1

Related Questions