Vatsal
Vatsal

Reputation: 2128

Issue with RXJS Scan operator | Skipping previously accumulated values

I have two streams. Stream A & B. Stream B is dependent over stream A and is expected to accumulate values emitted by stream A over time. I am using scan operator to achieve this. However, the issue is - whenever stream A emits a value, scan operator in stream B always starts with the default value and not the previously accumulated values. So my output is always based on the current response and I lose those values as soon as new value arrives.

Stream A is based on our UI. I have two tabs and an input form where the user submits the account id. I need the value of both tab name and the input selected. I am not sure what I am missing in this. If someone has any suggestions, that will be highly appreciated. Code provided below

  private streamA$ = this.selectedTab$.pipe(
    switchMap((tab) =>
      this.accountSubmitted$.pipe(
        switchMap(({ account, type }) => {
          return this.myHttpService.method.pipe(
            map((response) => {
              return {
                type,
                response
              };
          })
        );
      })
    )
  ),
 share()
);

private streamB$ =  this.streamA$.pipe(
  map(({ type, response }) => {
    const { graph } = response;
    return { type, graph };
  }),
  scan((accResponse, currentResponse) => {
    const { type, graph: linkedDetails } = currentResponse;
    const { nodes: newNodes = [], edges: newEdges = [] } = linkedDetails;
    if (type === 'init') {
      return {
        nodes: newNodes,
        edges: newEdges
      };
    }

    const { nodes: linkedNodes = [], edges: linkedEdges = [] } = accResponse;
    const allNodes = [...linkedNodes, ...newNodes];
    const allEdges = [...linkedEdges, ...newEdges];
    return {
      nodes: allNodes,
      edges: allEdges
    };
  }, {})
 );

Upvotes: 0

Views: 551

Answers (1)

Vatsal
Vatsal

Reputation: 2128

If anyone is looking at similar problem, in my case issue was not in the stream. Stream is working fine as expected. Main issue was in UI. I had a loader screen in ngIf & else block which shows loading screen whenever new http call is in progress and my component along with async file when I had the response. Since my component was getting distroyed/removed from UI on every http call, it was being registered and subscribed again every time. Hence, accumulator ( previously accumulated) response was always

{}

So now I updated my component to always show and loader will show/hide as per http response. So it doesn't destroy and resubscribes every time. Its subscribes once and is always there. Now, scan retains all previous values and the code works as expected.

Hope this helps someone.

Happy Learning

Vatsal

Upvotes: 1

Related Questions