Reputation: 89
I am working on an angular project that needs to load up pages and then display them one by one / two by two.
As per this article and some other sources, subscribing in services is almost never necessary. So is there a way to rewrite this in pure reactive style using RxJS operators?
Here's what I have (simplified) :
export class NavigationService {
private pages: Page[] = [];
private mode = Mode.SinglePage;
private index = 0;
private currentPages = new BehaviorSubject<Page[]>([]);
constructor(
private pageService: PageService,
private view: ViewService,
) {
this.pageService.pages$.subscribe(pages => {
this.setPages(pages);
});
this.view.mode$.subscribe(mode => {
this.setMode(mode);
});
}
private setPages(pages: Page[]) {
this.pages = pages;
this.updateCurrentPages();
}
private setMode(mode: Mode) {
this.mode = mode;
this.updateCurrentPages();
}
private updateCurrentPages() {
// get an array of current pages depending on pages array, mode & index
this.currentPages.next(...);
}
public goToNextPage() {
this.index += 1;
this.updateCurrentPages();
}
public get currentPages$() {
return this.currentPages.asObservable();
}
}
I've tried multiple solutions and didn't manage to get it right. The closest I got was using scan()
, but it always reset my accumulated value when the outer observables (pages, mode) got updated.
Any help is appreciated, thanks !
Upvotes: 0
Views: 901
Reputation: 15711
Let's first detail what you are doing:
pageService.pages$
and view.mode$
All this can be done in a simple pipeline. You'd need to include the index as an observable (behaviour subject in our case) to react to that change too.
Use combineLatest, this will subscribe to all observables we want, and trigger the pipe WHEN ALL have fired once, and every time one changes afterwards. You may want to use .pipe(startWith("something"))
on observables that should have a default value so your observable pipe triggers asap.
CombineLatest will then provide an object as value, with each value in the object key passed when created. Here pages, mode and index. I've used a switchMap to demo here if updateCurrentPages passes an observable, but you could use a map if there is no async task to be done.
export class NavigationService {
readonly currentPages$:Observable<Pages[]>;
constructor(
private pageService: PageService,
private view: ViewService,
) {
this.paginator = new Paginator(this.pageService.pages$);
this.currentPages$ = combineLatest({
pages:this.pageService.pages$,
mode:this.view.mode$,
index:this.this.paginator.pageChange$
}).pipe(
switchMap(({pages,mode,index})=>{
return this.updateCurrentPages(pages,mode);
}),
);
}
private updateCurrentPages() {
// get an array of current pages depending on pages array, mode & index
this.currentPages.next(...);
}
public goToNextPage() {
this.paginator.next();
}
}
class Paginator{
pageChange$ = combineLatest({
total:this.pages$.pipe(map(pages=>pages.length)),
wanted:this.pageMove$}).pipe(map({total,wanted}=>{
// Make sure it is between 0 and maximum according to pages.
return Math.max(Math.min(total-1,wanted),0);
}),
// Do not emit twice the same page (pressing next when already at last)
dinstinctUntilChanged());
);
pageMove$ = new BehaviorSubject<number>(0);
constructor(pages$: Observable<Pages[]>){
}
next(){
this.pageMove$.next(this.pageMove$.value()+1);
}
previous(){
this.pageMove$.next(this.pageMove$.value()-1);
}
to(i:number){
this.pageMove$.next(i);
}
}
Beware tough about stating that subscribe
is never needed. You may want to subscribe to some events for some reason. It is just that combining everything in a pipeline makes things easier to handle... In the example above, the observables will be unsubscribed to when the consumer of your service unsubscribes to currentPages$. So one thing less to handle.
Also, note that if multiple consumers subscribe to this service's currentPages$ the pipeline will be duplicated and unecessary work will be done, once for each subscriber. While this MAY be good, you might want to have everyone subscribe to the same "final" observable. This is easily do-able by adding share() or shareReplay(1) at the end of your pipeline. Share will make sure the same observable pipeline will be used for the new subscriber, and they will receive new values starting from then. Using shareReplay(1), will do the same but also emit the latest value directly on subscribe (just like BehaviourSubject) the 1 as parameter is indeed the number of replays to send out...
Hope this helps! When you master the RxJS you'll see that things will get easier and easier (see the difference in code amount!) but getting the hang of it take a little bit of time. Do not worry, just perseverate you'll get there. (Hint to get better, using outside variables/properties are the evil of handling pipelines)
Upvotes: 1
Reputation: 20654
You can use merge to create reducer functions from observables. These functions will update part of a state maintained by the service. They are past along to the scan operator which will update the prior state from the reducer. After the reducer is run, currentPages is set on the new state and that new state is returned.
export class NavigationService {
private readonly relativePageChangeSubject = new Subject<number>();
readonly state$ = merge(
this.pageService.pages$.pipe(map((pages) => (vm) => ({ ...vm, pages }))),
this.relativePageChangeSubject.pipe(map((rel) => (vm) => ({ ...vm, index: vm.index + rel }))),
this.view.mode$.pipe(map((mode) => (vm) => ({ ...vm, mode })))
).pipe(
startWith((s) => s), // if necessary, force an initial value to be emitted from the initial value in scan.
scan((s, reducer) => {
const next = reducer(s);
// update currentPages on the next state here.
return next;
}, { currentPages: [], index: 0, mode: Mode.SinglePage, pages: [] }),
shareReplay(1)
)
readonly currentPages$ = this.state$.pipe(
map(x => x.currentPages),
distinctUntilChanged()
);
constructor(private pageService: PageService, private view: ViewService) { }
goToNextPage() {
this.relativePageChangeSubject.next(1);
}
}
Notes:
Upvotes: 1