Starfish
Starfish

Reputation: 3574

RxJS observable does not trigger subscribe after mergeMap

I'm trying to group my collection of data using the RxJS operators and split it in multiple streams in my Angular app, but cannot seem to get it to work. In my SignalRService I'm setting up a SignalR trigger in the constructor, such that when data is passed from the server, it will pass it to the Subject I created.

export class SignalRService {
  private connection?: signalR.HubConnection;
  private orders = new Subject<OrderModel[]>();
  orders$ = this.orders.asObservable();
    
  constructor() {
    // ... SignalR connection functions ...
    
    this.connection?.on('GetOrders', (data: OrderModel[]) => {
      this.orders.next(data);
    });
  }
};

In the OrderService I subscribe to the orders$ Subject by using some pipe operators, because I want to split the data in 3 different streams based on the status of the Order object.

I flatten the map, use groupBy and then merge again using the key and corresponding data, however, this does not work for some reason and I'm not sure where I should look. When I use the tap operators in between the current operators, it only logs the first two taps. It never seems to get to the third and thus never executes the subscribe I suppose. Also, when this.orders.next(data) in SignalRService gets executed twice or more, nothing happens.

export class OrderService {
  // Observable sources
  private orderCategory0 = new BehaviorSubject<OrderModel[]>([]);
  private orderCategory1 = new BehaviorSubject<OrderModel[]>([]);
  private orderCategory2 = new BehaviorSubject<OrderModel[]>([]);
  private orders = [this.orderCategory0, this.orderCategory1, this.orderCategory2];
  // Observable streams
  orderCategory0$ = this.orderCategory0.asObservable();
  orderCategory1$ = this.orderCategory1.asObservable();
  orderCategory2$ = this.orderCategory2.asObservable();
  
  constructor(private signalRService: SignalRService) {
    signalRService.orders$
      .pipe(
        mergeMap((res) => res),
        //tap((res) => console.log(res)), <-- This one shows
        groupBy((order: OrderModel) => order.status),
        //tap((res) => console.log(res)), <-- This one shows
        mergeMap((group) => zip(of(group.key), group.pipe(toArray())))
        //tap((res) => console.log(res)), <-- This one doesn't
      )
      .subscribe(([groupId, data]) => this.orders[groupId].next(data));
  }
};

Note that when I do something like the following in OrderService, everything works as expected:

signalRService.orders$.subscribe((data: OrderModel[]) => {
  const groups = this.groupData(data);

  this.orderCategory0.next(groups[0]);
  this.orderCategory1.next(groups[1]);
  this.orderCategory2.next(groups[2]);
});

Currently, I'm lost, maybe I'm doing this completely wrong, so any pointers would be appreciated.

Edit: Also, when I hardcode the Orders and use of(orders).pipe(...).subscribe(...), and thus omit the signalRService.order$-part, everything works fine as well.

Upvotes: 2

Views: 732

Answers (1)

Daniel Gimenez
Daniel Gimenez

Reputation: 20454

The issue is that group.pipe(toArray()) is never going to emit, because group is an observable that stays open (presumably until order$ completes), and toArray waits to emit until the observable completes. That being said, the use of groupBy was probably overkill to begin with.

If I interpret you example correctly, you're creating the groups just to pass them off to the correct behavior subject which are in a collection keyed by status. You can just use array.reduce (or Ramda's groupBy) to create groups, no need to convert the array to an observable if you don't really need the stream.

signalRService.orders$.subscribe((orders: OrderModel[]) => {
  const groups = orders.reduce((acc, cur) => {
      (acc[cur.status] || (acc[cur.status] = [])).push(cur);
      return acc;
    },
    {} as Record<string, OrderModel[]>);
  Object.values(entries).forEach(([k, v]) => this.orders[k].next(order);
});

If you really need the groupBy (maybe this is a simplification of a bigger problem), then don't use zip and have an inner pipe instead.

You probably don't need the subjects either. You can map orders to the grouped object and then project the individual groups as needed.

const readonly orders$ = signalRService.orders$.pipe(
  map((orders) => orders.reduce((acc, cur) => {
      (acc[cur.status] || (acc[cur.status] = [])).push(cur);
      return acc;
    },
    {} as Record<string, OrderModel[]>)
  ),
  shareReplay(1)
);
const readonly orderCategory0$ = this.orders$.pipe(map(x => x['status0'] || []));
const readonly orderCategory1$ = this.orders$.pipe(map(x => x['status1'] || []));
const readonly orderCategory2$ = this.orders$.pipe(map(x => x['status2'] || []));

Upvotes: 3

Related Questions