DRNR
DRNR

Reputation: 453

shareReplay until certain condition - refetch data - again use shareReplay, possible?

I have a case where I want to cache a http response, which contains lots of data needed all over the app and does not change (until now... thus the question...), thus my immediate thought was to use shareReplay, which works wonders no need to fetch the same set of data more than once during the lifetime of the app for that session. Easy enough, declare a variable, e.g allData$ and subscribe to that in components needed, e.g service:

allData$ = this.getAllData().pipe(shareReplay())

But in one component... I can now actually modify that data which this endpoint returns, for example add a new item and therefore I need to at these scenarios refetch the data and again cache that new response. Is there a nice rxjs way to do this? I was wanting to do something like this all like in a "one liner" using only that single allData$ variable above. What I have resorted to now seems kinda ugly to me, adding a new BehaviorSubject into the mix and a function that components can call when needed to update this data, i.e service code:

allData = new BehaviorSubject({} as MyModel);
allData$ = this.allData.asObservable();

constructor(private http: HttpClient) {
  this.getAllData().subscribe();
}

// this is called by component, when the db has been modified and need to update allData
refetchData() {
  this.getAllData().subscribe();
}

getAllData() {
  return this.http.get<MyModel>('url').pipe(
    // updating the stream...
    tap(value => this.allData.next(value))
  )
}

I'm sure there is a nice rxjs operator which does all this?

Upvotes: 2

Views: 1136

Answers (1)

Daniel Gimenez
Daniel Gimenez

Reputation: 20654

A super useful operator for situations like these is scan. It allows you to modify an initial state from stream of changes. The stream can be of whatever you like, but often I'll just stream functions since that is the most flexible.

dataChanges = new Subject<DataItem>();

allData$ = this.allData.pipe(
  switchMap(initialState => dataChanges.pipe(
    scan((state, newItem) => [...state, newItem] /* modify how you like */, initialState),
    startWith(initialState)
  )),
  shareReplay(1)
);

The above code gets the initial data and then subscribes to the observable that streams changes. In my example I just add to the array and return it, but you can basically do whatever you want.

As I said above you could also pass functions that return an updated state. This might be a little harder to grasp initially, but is both cleaner and more flexible.

dataReducers = new Subject<(s: AllData) => AllData>();

allData$ = concat(
  this.allData.pipe(map(take(1)) // get initial data.
  this.dataReducers
).pipe(
  scan((state: AllData, reducer: (x: AllData) => AllData) => reducer(state)),
  shareReplay(1)
);

My one issue with scan is setting the initial value. In the first example I have to create that inner stream. In the second example I take advantage of the fact that if you don't provide a seed, scan will return the first value piped to it.

The typing of scan's parameter is wonky if you do it the second way. By explicitly declaring the types I get around that wonkiness, but that could cause errors if you're not careful. For example, if I used merge instead of concat and didn't use the take(1) operator then it's possible a non-function would get passed as the reducer.

You could add a check to see if the reducer is actually a function, or you can pass an empty object as the seed parameter, and map the initial observable to a reducer function that doesn't take a parameter. However once you do the things you've lost the cleanliness advantage. For that reason I prefer the form in the first example with the reducers from the second.

Upvotes: 1

Related Questions