BlackICE
BlackICE

Reputation: 8926

Combine latest value of observables AFTER every emission from source observable

I have a source Observable (actually a Subject) and when that Observable emits a value, I need to kick off a load of values that results in 2 more observables being updated, and when that load is done take the value of the source and combine it with the latest values of the 2 "dependent" observables, but only the values from AFTER the source observable emits. So if the 2 dependent observables have values when the source emits, I need this to wait until the 2 dependents get updates before emitting the latest of all 3 observables. I've tried using various incantations of withLatestFrom, switchMap, and combineLatest, but nothing is giving me the desired output:

This will emit values in the right shape, but not at the right time, it is using the values from before the SelectExpenseSummary has done its thing and the expenseDetails$ and expenseReceipts$ have updated.

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         withLatestFrom(
          this.expenseDetails$,
          this.expenseReceipts$,
          )
        )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

This emits a bunch of times, with the last time having the correct values, which would be OK, but emits the wrong shape, it's missing the source observable from the output:

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         switchMap(() =>
            combineLatest(this.expenseDetails$,
            this.expenseReceipts$,
            )
          )
        )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

This has the right output shape, having all 3 observables, but it never emits anything in the subscribe, I'm guessing because of the use of editExpenseSubject inside:

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         switchMap(x =>
          combineLatest(
            this.editExpenseSubject
            , this.expenseDetails$
            , this.expenseReceipts$
          )
        )
      )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

Can someone point me to the correct incantation to get what I'm after, preferably with detailed explanation as to why.


Update for additional info (I think this gets you through the whole flow):

    this.expenseDetails$ = this.expenseStore.pipe(
      select(fromExpenses.getExpenseLines)
    );
    this.expenseReceipts$ = this.expenseStore.pipe(
      select(fromExpenses.getExpenseReceipts)
    );

export const getExpenseLines = createSelector(StateFeatureSelector, x => x.expenseLines);
export const getExpenseReceipts = createSelector(StateFeatureSelector, x => x.expenseReceipts);

export interface State {
  expenseSummaries: Array<IExpenseSummary>;
  expenseLines: Array<IExpenseLine>;
  expenseReceipts: Array<IExpenseReceipt>;
  selectedUser: IDirectReport;
  selectedExpenseSummary: IExpenseSummary;
}


  @Effect() LoadExpenseLines$ = this.actions$.pipe(
    ofType<fromExpense.LoadExpenseLines>(fromExpense.ActionTypes.LoadExpenseLines)
    , tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseLines$:filtered', action}))
    , mergeMap(x =>
      this.service.getExpenseLines(x && x.payload)
        .pipe(
          tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseLines', receipts}))
          , map(lineItems => new fromExpense.SetExpenseLines(lineItems))
        )
    )
  );
  @Effect() LoadExpenseReceipts$ = this.actions$.pipe(
    ofType<fromExpense.LoadExpenseReceipts>(fromExpense.ActionTypes.LoadExpenseReceipts)
    , tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseReceipts$:filtered', action}))
    , mergeMap(x =>
      this.service.getExpenseReceipts(x && x.payload)
        .pipe(
          tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseReceipts', receipts}))
          , map(receipts => new fromExpense.SetExpenseReceipts(receipts))
        )
    )
  );

  @Effect() SelectExpenseSummary$ = this.actions$.pipe(
    ofType<fromExpense.SelectExpenseSummary>(fromExpense.ActionTypes.SelectExpenseSummary)
    , tap(action => this.log.debug({type: 'ExpenseEffects:SelectExpenseSummary$:filtered', action}))
    , mergeMap(x =>
        [
          new fromExpense.LoadExpenseLines(x.payload)
          , new fromExpense.LoadExpenseReceipts(x.payload)
        ]
    )
  );

export class SelectExpenseSummary implements Action {
  readonly type = ActionTypes.SelectExpenseSummary;
  constructor(public payload: IExpenseSummary) {}
}

export class LoadExpenseLines implements Action {
  readonly type = ActionTypes.LoadExpenseLines;
  constructor(public payload: IExpenseSummary) {}
}
export class SetExpenseLines implements Action {
  readonly type = ActionTypes.SetExpenseLines;
  constructor(public payload: Array<IExpenseLine>) {}
}

export class LoadExpenseReceipts implements Action {
  readonly type = ActionTypes.LoadExpenseReceipts;
  constructor(public payload: IExpenseSummary) {}
}

export class SetExpenseReceipts implements Action {
  readonly type = ActionTypes.SetExpenseReceipts;
  constructor(public payload: Array<IExpenseReceipt>) {}
}

export function reducer (state = initialState, action: actions.ActionsUnion): State {
  switch (action.type) {
// ...  other actions cut
    case actions.ActionTypes.SetExpenseLines:
      return {
        ...state,
        expenseLines: action.payload && [...action.payload] || []
      };
    case actions.ActionTypes.SetExpenseReceipts:
      return {
        ...state,
        expenseReceipts: action.payload && [...action.payload] || []
      };
    default:
      return state;
  }
}

// from the service class used in the effects

  getExpenseLines(summary: IExpenseSummary): Observable<Array<IExpenseLine>> {
    this.log.debug({type: 'ExpenseService:getExpenseLines', summary, uri: this.detailUri});
    if (summary) {
      return this.http
        .post<Array<IExpenseLine>>(this.detailUri, {ExpenseGroupId: summary.ExpReportNbr})
        .pipe(
          tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
        );
    } else {
      this.log.log({ type: 'ExpenseService:getExpenseLines null summary'});
      return of<Array<IExpenseLine>>([])
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
      );
    }
  }
  getExpenseReceipts(summary: IExpenseSummary): Observable<Array<IExpenseReceipt>> {
    this.log.debug({type: 'ExpenseService:getExpenseReceipts', summary, uri: this.receiptUri});
    if (summary) {
      return this.http
      .post<Array<IExpenseReceipt>>(this.receiptUri, {ExpenseGroupId: summary.ExpReportNbr})
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
      );
    } else {
      this.log.log({ type: 'ExpenseService:getExpenseReceipts null summary'});
      return of<Array<IExpenseReceipt>>([])
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
      );
    }
  }

Upvotes: 3

Views: 6153

Answers (2)

user4676340
user4676340

Reputation:

I'm going from @Reactgular's assumption :

You want to take an emitted value from the subject, and then emit from 2 other observables their latest value. With the output being an array of 3 items.

You want the 2 inner observables to fetch new data after the outer observable emits, and you only want their first value.

From there, the operator that comes to mind is skip : it skips the number of values you ask to skip and continues afterward.

I have made a sandbox for you to see it in action : as you can see, both observables are updated from a single source, and you get only one event, which is the last values of each observable.

related code :

import { BehaviorSubject, combineLatest, fromEvent } from "rxjs";
import {
  delayWhen,
  skip,
  distinctUntilKeyChanged,
  distinctUntilChanged,
  skipUntil,
  switchMap,
  map
} from "rxjs/operators";

const counts = {
  source: 0,
  first: 0,
  second: 0
};

const source$ = new BehaviorSubject("source = " + counts.source);
const first$ = new BehaviorSubject("first = " + counts.first);
const second$ = new BehaviorSubject("second = " + counts.second);

// Allow the source to emit
fromEvent(document.querySelector("button"), "click").subscribe(() => {
  counts.source++;
  source$.next("source = " + counts.source);
});

// When the source emits, the others should emit new values
source$.subscribe(source => {
  counts.first++;
  counts.second++;
  first$.next("first = " + counts.first);
  second$.next("second = " + counts.second);
});

// Final result should be an array of all observables

const final$ = source$.pipe(
  switchMap(source =>
    first$.pipe(
      skip(1),
      switchMap(first =>
        second$.pipe(
          skip(1),
          map(second => [source, first, second])
        )
      )
    )
  )
);

final$.subscribe(value => console.log(value));

Upvotes: 4

Reactgular
Reactgular

Reputation: 54821

If I understand you correctly....

You want to take an emitted value from the subject, and then emit from 2 other observables their latest value. With the output being an array of 3 items.

You want the 2 inner observables to fetch new data after the outer observable emits, and you only want their first value.

 this.editExpenseSubject.pipe(
        switchMap(editExpense =>
            forkJoin(
               this.expenseDetails$.pipe(first()),
               this.expenseReceipts$.pipe(first()),
            ).pipe(
               map(values => ([editExpense, ...values]))
            )
          )
        )

You can use a switchMap() so that it calls subscribe on the inner observable, and use a forkJoin() to get the final values. Adding a first() operator to limit the observables to 1 value. You then use map() to place the outer value back into the results array.

Upvotes: 2

Related Questions