pop
pop

Reputation: 3734

Dynamically adding to the queue of RXJS Observables

The objective is to dynamically add the queue of HTTP requests and handle the queue as one request. Let me show the status quo first.

component.ts

public ngOnInit(): void {
    this.service.touched
        .pipe( 
             filter(res => !!res),
             takeUntil(this.ngUnsubscribe)
         )
        .subscribe(() => this.onSave());
}

public onSave(): void {
    this.busy = true;
    this.service.save()
        .pipe(finalize(() => this.busy=false))
        .subscribe(() => this.isInSync = true);
}

service.ts

public touched: BehaviorSubject<boolean> = new BehaviorSubject(false);
public saved: BehaviorSubject<boolean> = new BehaviorSubject(false);

public save(): Observable<ApiResponse> {

    const data = this.getDirtyData();

    const payload = data.map(( d: Data) => {
       /** [...] */
    });

    const reqOptions: HttpRequest<any> = this.api.setRequestOptions( '/url', payload );
    return this.api.apiHttpRequest(reqOptions)
        .pipe(
            map( resp => resp.data),
            tap( resp => this.saved.next(resp))
        );

}

This is what it looks now. It works fine - but what I need is to be able to form a queue of those http requests if the user fires that onSave method multiple times, so that component gets to finalize the Observable only once. If I use it the way it is, the component subscribes to the observable multiple times and so sets this.busy and this.isInSync prematurely.

Any suggestions on how to solve this in an elegant way?

Upvotes: 2

Views: 1614

Answers (2)

maxime1992
maxime1992

Reputation: 23813

When you do this:

public onSave(): void {
    this.busy = true;
    this.service.save()
        .pipe(finalize(() => this.busy=false))
        .subscribe(() => this.isInSync = true); <--------------
}

You can think of it as a code smell. When you call subscribe, that's where the reactive programming ends. This should not happen until the very last end of what you're trying. In this case, as you subscribe within a method that is called multiple time, you'll loose the ability to queue the requests as you'd like.

I think what you're trying to achieve can be represented with the following code (mix of mock for the HTTP call and actual stream logic):

const mockHttpRequest = (someParam: any) =>
  of(`Result: ${someParam}`).pipe(delay(1000));

const change$$ = new Subject<any>();

const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));

const isSyncing$ = combineLatest([
  change$$.pipe(
    map((_, index) => index),
    startWith(-1)
  ),
  queue$.pipe(
    map((_, index) => index),
    startWith(-1)
  )
]).pipe(
  map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
  distinctUntilChanged()
);

isSyncing$.subscribe(console.log);

A little bit of explanation on this:

const mockHttpRequest = (someParam: any) =>
  of(`Result: ${someParam}`).pipe(delay(1000));

Simply mock an HTTP call with a delay

const change$$ = new Subject<any>();

The subject that you should call whenever a change happens

const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));

As its name says: This is the queue for all the HTTP calls. They'll happen one by one and won't be done in parallel.

const isSyncing$ = combineLatest([
  change$$.pipe(
    map((_, index) => index),
    startWith(-1)
  ),
  queue$.pipe(
    map((_, index) => index),
    startWith(-1)
  )
]).pipe(
  map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
  distinctUntilChanged()
);

This observable will let you know the current status of the app: Whether it's syncing some data or not. To do this, we compare how many items have successfully gone out of the HTTP queue with how many saves attempts where made. If it's the same number we know we're in sync otherwise we're syncing.

The distinctUntilChanged is necessary because if we emit let say 2 changes really quickly and the second one we try to send it before the last one comes back, the observable isSyncing$ would emit twice false before emitting true once they're both done. With distinctUntilChanged we ensure that it emits only once false and only once true when that's the case.

Finally to test this we can do the following:

// simulate events happening when a user type at different times
// on the document to test our code
change$$.next('This is some initial text...');

setTimeout(() => {
  change$$.next('This is some initial text... And now with some other edit...');
}, 1500);

setTimeout(() => {
  change$$.next(
    'This is some initial text... And now with some other edit... And a quick one before previous is finished processing...'
  );
}, 1500);

setTimeout(() => {
  change$$.next(
    'This is some initial text... And now with some other edit... And a quick one before previous is finished processing... And a last one'
  );
}, 4500);

and it should output

false
true
false
true
false
true
false

Note that the ones happening at the same time (1500) don't emit twice it's just that the status isSyncing$ stays true for a bit longer than the other as the 2 calls are made one after another

false
true
false
true <--- this one
false
true
false

If you want to play with this, here's a stackblitz: https://stackblitz.com/edit/rxjs-vtnk6z?devtoolsheight=60

Upvotes: 1

Fatih Ersoy
Fatih Ersoy

Reputation: 739

Depending on your design, combineLatest or forkJoin may be good choices for you. for example, if you have more than 1 service calls and you want to treat them as 1 request call:

forkJoin({
  serviceCall1: this.httpService1(input1:string...);
  serviceCall2: this.httpService2(input2:string...);
}).pipe().subscribe();

forkJoin waits all the requests inside to be completed and then emits their value. combineLatest emits whenever one of the requests resolve.

Also in rxjs7 there are connect, connectable which solves it better but not compatible older versions of Angular.

for further information : forkJoin combineLatest

Upvotes: 0

Related Questions