Reputation: 3734
The objective is to dynamically add the queue of HTTP requests and handle the queue as one request. Let me show the status quo first.
component.ts
public ngOnInit(): void {
this.service.touched
.pipe(
filter(res => !!res),
takeUntil(this.ngUnsubscribe)
)
.subscribe(() => this.onSave());
}
public onSave(): void {
this.busy = true;
this.service.save()
.pipe(finalize(() => this.busy=false))
.subscribe(() => this.isInSync = true);
}
service.ts
public touched: BehaviorSubject<boolean> = new BehaviorSubject(false);
public saved: BehaviorSubject<boolean> = new BehaviorSubject(false);
public save(): Observable<ApiResponse> {
const data = this.getDirtyData();
const payload = data.map(( d: Data) => {
/** [...] */
});
const reqOptions: HttpRequest<any> = this.api.setRequestOptions( '/url', payload );
return this.api.apiHttpRequest(reqOptions)
.pipe(
map( resp => resp.data),
tap( resp => this.saved.next(resp))
);
}
This is what it looks now. It works fine - but what I need is to be able to form a queue of those http requests if the user fires that onSave
method multiple times, so that component gets to finalize
the Observable only once. If I use it the way it is, the component subscribes to the observable multiple times and so sets this.busy
and this.isInSync
prematurely.
Any suggestions on how to solve this in an elegant way?
Upvotes: 2
Views: 1614
Reputation: 23813
When you do this:
public onSave(): void {
this.busy = true;
this.service.save()
.pipe(finalize(() => this.busy=false))
.subscribe(() => this.isInSync = true); <--------------
}
You can think of it as a code smell. When you call subscribe, that's where the reactive programming ends. This should not happen until the very last end of what you're trying. In this case, as you subscribe
within a method that is called multiple time, you'll loose the ability to queue the requests as you'd like.
I think what you're trying to achieve can be represented with the following code (mix of mock for the HTTP call and actual stream logic):
const mockHttpRequest = (someParam: any) =>
of(`Result: ${someParam}`).pipe(delay(1000));
const change$$ = new Subject<any>();
const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));
const isSyncing$ = combineLatest([
change$$.pipe(
map((_, index) => index),
startWith(-1)
),
queue$.pipe(
map((_, index) => index),
startWith(-1)
)
]).pipe(
map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
distinctUntilChanged()
);
isSyncing$.subscribe(console.log);
A little bit of explanation on this:
const mockHttpRequest = (someParam: any) =>
of(`Result: ${someParam}`).pipe(delay(1000));
Simply mock an HTTP call with a delay
const change$$ = new Subject<any>();
The subject that you should call whenever a change happens
const queue$ = change$$.pipe(concatMap(change => mockHttpRequest(change)));
As its name says: This is the queue for all the HTTP calls. They'll happen one by one and won't be done in parallel.
const isSyncing$ = combineLatest([
change$$.pipe(
map((_, index) => index),
startWith(-1)
),
queue$.pipe(
map((_, index) => index),
startWith(-1)
)
]).pipe(
map(([changeIndex, queueIndex]) => changeIndex !== queueIndex),
distinctUntilChanged()
);
This observable will let you know the current status of the app: Whether it's syncing some data or not. To do this, we compare how many items have successfully gone out of the HTTP queue with how many saves attempts where made. If it's the same number we know we're in sync otherwise we're syncing.
The distinctUntilChanged
is necessary because if we emit let say 2 changes really quickly and the second one we try to send it before the last one comes back, the observable isSyncing$
would emit twice false
before emitting true
once they're both done. With distinctUntilChanged
we ensure that it emits only once false
and only once true
when that's the case.
Finally to test this we can do the following:
// simulate events happening when a user type at different times
// on the document to test our code
change$$.next('This is some initial text...');
setTimeout(() => {
change$$.next('This is some initial text... And now with some other edit...');
}, 1500);
setTimeout(() => {
change$$.next(
'This is some initial text... And now with some other edit... And a quick one before previous is finished processing...'
);
}, 1500);
setTimeout(() => {
change$$.next(
'This is some initial text... And now with some other edit... And a quick one before previous is finished processing... And a last one'
);
}, 4500);
and it should output
false
true
false
true
false
true
false
Note that the ones happening at the same time (1500) don't emit twice it's just that the status isSyncing$
stays true for a bit longer than the other as the 2 calls are made one after another
false
true
false
true <--- this one
false
true
false
If you want to play with this, here's a stackblitz: https://stackblitz.com/edit/rxjs-vtnk6z?devtoolsheight=60
Upvotes: 1
Reputation: 739
Depending on your design, combineLatest
or forkJoin
may be good choices for you. for example, if you have more than 1 service calls and you want to treat them as 1 request call:
forkJoin({
serviceCall1: this.httpService1(input1:string...);
serviceCall2: this.httpService2(input2:string...);
}).pipe().subscribe();
forkJoin waits all the requests inside to be completed and then emits their value. combineLatest emits whenever one of the requests resolve.
Also in rxjs7 there are connect
, connectable
which solves it better but not compatible older versions of Angular.
for further information : forkJoin combineLatest
Upvotes: 0