Reputation: 222
I'm trying to implement some sort of queueing with support for cancelling (or not) an ongoing HTTP request.
In my head, it looks something like this, but I can't seem to find the "propper tools" for the job :/
My first idea was to use a takeUntil together with the concatMap and somehow cancel the ongoing request. But then, how can I actually check the items of the queue? It feels like I'm missing a handy operator.
// in the service
queue.pipe(
tap(()=> { how to check and trigger sameItem$.next() })
// using concatMap to process every item from the queue sequentially
concatMap((item) => this.httpClient.get(item)).pipe(takeUntil(this.sameItem$))
).subscribe((x)=> {console.log('response', x)})
// In the component
loadMore(item){
this.queueService.queue.next(item);
}
I was also thinking about using switchMap or concatMap based on "is the item already in the queue?" but I don't even know if this makes any sense :)
Also don't mind if the solution goes on a different direction, as long as I can leverage the power of RxJS!
Upvotes: 1
Views: 208
Reputation: 2284
Assuming you have some kind of id
in the item, you could do something like this.
queue.pipe(
groupBy((item) => item.id), // split queue values into independent obs based on grouping key
mergeMap( // process each group obs in parallel
(group$) => group$.pipe(
switchMap((item) => this.httpClient.get(item)) //cancel prev req in the group if still running
)
)
)
.subscribe(console.log);
cheers
Upvotes: 1