fionbio
fionbio

Reputation: 3554

Create single observable per event type & destroy on last unsubscribe

Imagine a stream of messages, each with associated user id. For each message that comes in, fetch the associated user information ('user-fetch' observable). These user-fetch observables will stay alive, and monitor any future changes for the target user.

Questions:

  1. How to prevent duplicate 'user-fetch' observables from being created for a given user-id (and reuse the possibly already created observable)?
  2. How to correctly cleanup all user-fetch observables for unsubscribe and/or complete?

Where I'm at:

  1. I wasn't able to determine an existing operator or methodology to prevent duplicate observables, so I wrote an operator similar to switchMap. I don't love it. How is this done in practice?

  2. If I can solve 1, I believe the solution to correct cleanup and reuse is refCount().

Upvotes: 3

Views: 181

Answers (1)

if I understood the problem correctly you have one stream that emits id-s and based on that stream events, another stream that receives some data related to the id from remote place (server).

The solution that I suggest is to create some kind of store to hold the cached data and upon receiving a message from the id stream to check it and return either the response from new request or the cached data.

/** 
 * callBack end mocks an http request
 */
let callBackEnd$ = id => {
  customLog("___________________");
  customLog("Calling the server for " + id);
  customLog("___________________");

  return of({ id: id, data: `Some data about ${id}` });
};

/**
 * idStream$ mock the stream of id-s to be called trough http request
 */
let idStream$ = from([1, 2, 2, 3, 1, 5, 3, 4, 5]);

/**
 * We use reqStore$ to cache the already retrieved data
 */
let reqStore$ = new BehaviorSubject([]);

/**
 *  1. We subscribe to the message stream ( the stream that will tell us what to load )
 *
 *  2. With `latestFrom` we take the current store and check for any cached data, and return
 *  the cached data or the response of the new request
 *
 *  3. If the response of the `switchMap` doesn't exist in our store we add it.
 */
idStream$
  .pipe(
    tap(message => customLog(`Receiving command to retrieve : ${message}`)),
    withLatestFrom(reqStore$),
    switchMap(([e, store]) => {
      let elementSaved = store.find(x => x.id === e);
      return elementSaved ? of(elementSaved) : callBackEnd$(e);
    }),
    withLatestFrom(reqStore$),
    tap(([response, store]) => {
      if (!store.find(x => x.id === response.id)) {
        reqStore$.next([...store, response]);
      }
    })
  )
  .subscribe(([currentResponse, currentStore]) => {
    customLog("Receiving response for " + currentResponse.data);
  });

Here is live demo at Codesandbox I hope that helps you out :)

Upvotes: 1

Related Questions