AlanObject
AlanObject

Reputation: 9983

Call a function when Observer subscribes to an RxJS ipe

I have an RxJS observable-producing method that works but it seems a bit hacky and I was wondering if it can be improved with better use of RxJs operators.

What I have implemented is an object cache (in an Angular service) where events are generated when objects are loaded, edited, or changed. The event object looks like this:

export interface DNodeEvent {
  type: 'load' | 'unload' | 'update' | 'create' | 'error';
  node: DNode;
}

So I have a method that generates an observable for components to use. The caller provides an id/serial number and gets a stream of node objects. My current code looks like this:

  obById(id: number): Observable<DNode> {
    const unload = { type: 'unload' } as DNode;
    const pipeId =  this.nodeEvent$.pipe(
      filter(ne => ne.node.serial === id &&
        (ne.type === 'load' || ne.type === 'update')),
      map(ne => ne.node)
    );
    const pipeLoad = of(id).pipe(
      tap(loadTarget => this.loadId(loadTarget)),
      switchMap(loadTarget => of(unload)),
      filter(dn => dn.type === 'load')
    );
    return merge(pipeId, pipeLoad);
  }

The pipeId observable generates the stream of nodes and I don't see what to improve there.

However the function of the pipeload observable is to call the this.loadId method which triggers the generation of an event out of the nodeEvent$ Subject. If it finds the requested object in the cache it sends an event immediately otherwise it will subscribe to an http observable to fetch it from the server then generate the event.

The only problem I have is that I don't want pipeLoad to emit anything. But I ended up putting the switchMap/filter operators there to close complete the of operator. Also I had to stick the awkward unload constant in there to keep the compiler from complaining about type matching.

Is there a more elegant/robust way of getting this done?

Upvotes: 1

Views: 163

Answers (2)

Mauro Aguilar
Mauro Aguilar

Reputation: 1338

I don't understand the purpose of doing this:

const unload = { type: 'unload' } as DNode;

...
const pipeLoad = of(id).pipe(
  tap(loadTarget => this.loadId(loadTarget)),
  switchMap(loadTarget => of(unload)),          <--- ?
  filter(dn => dn.type === 'load')              <--- ?
);

return merge(pipeId, pipeLoad);

wouldn't it be the same to do this?

obById(id: number): Observable<DNode> {
  const pipeId$ =  this.nodeEvent$.pipe(
    filter(ne => ne.node.serial === id &&
      (ne.type === 'load' || ne.type === 'update')),
      map(ne => ne.node)
    );

  this.loadId(id);

  return pipeId$;
}

UPDATE:

Using a BehaviorSubject instead of a Subject using the code suggested above may solve the issue. I've tried to reproduce your code in Stackblitz, check it out here.

Upvotes: 0

Fan Cheung
Fan Cheung

Reputation: 11380

You can put ignoreElements() to mute the stream yet still subscribe to it.

const pipeLoad = of(id).pipe(
  tap(loadTarget => this.loadId(loadTarget)),
  switchMap(loadTarget => of(unload)),

  ignoreElements()
);

Upvotes: 2

Related Questions