Reputation: 3574
I'm trying to group my collection of data using the RxJS operators and split it in multiple streams in my Angular app, but cannot seem to get it to work. In my SignalRService
I'm setting up a SignalR trigger in the constructor, such that when data is passed from the server, it will pass it to the Subject I created.
export class SignalRService {
private connection?: signalR.HubConnection;
private orders = new Subject<OrderModel[]>();
orders$ = this.orders.asObservable();
constructor() {
// ... SignalR connection functions ...
this.connection?.on('GetOrders', (data: OrderModel[]) => {
this.orders.next(data);
});
}
};
In the OrderService
I subscribe to the orders$
Subject by using some pipe operators, because I want to split the data in 3 different streams based on the status
of the Order
object.
I flatten the map, use groupBy and then merge again using the key and corresponding data, however, this does not work for some reason and I'm not sure where I should look.
When I use the tap
operators in between the current operators, it only logs the first two taps. It never seems to get to the third and thus never executes the subscribe I suppose.
Also, when this.orders.next(data)
in SignalRService
gets executed twice or more, nothing happens.
export class OrderService {
// Observable sources
private orderCategory0 = new BehaviorSubject<OrderModel[]>([]);
private orderCategory1 = new BehaviorSubject<OrderModel[]>([]);
private orderCategory2 = new BehaviorSubject<OrderModel[]>([]);
private orders = [this.orderCategory0, this.orderCategory1, this.orderCategory2];
// Observable streams
orderCategory0$ = this.orderCategory0.asObservable();
orderCategory1$ = this.orderCategory1.asObservable();
orderCategory2$ = this.orderCategory2.asObservable();
constructor(private signalRService: SignalRService) {
signalRService.orders$
.pipe(
mergeMap((res) => res),
//tap((res) => console.log(res)), <-- This one shows
groupBy((order: OrderModel) => order.status),
//tap((res) => console.log(res)), <-- This one shows
mergeMap((group) => zip(of(group.key), group.pipe(toArray())))
//tap((res) => console.log(res)), <-- This one doesn't
)
.subscribe(([groupId, data]) => this.orders[groupId].next(data));
}
};
Note that when I do something like the following in OrderService
, everything works as expected:
signalRService.orders$.subscribe((data: OrderModel[]) => {
const groups = this.groupData(data);
this.orderCategory0.next(groups[0]);
this.orderCategory1.next(groups[1]);
this.orderCategory2.next(groups[2]);
});
Currently, I'm lost, maybe I'm doing this completely wrong, so any pointers would be appreciated.
Edit: Also, when I hardcode the Orders and use of(orders).pipe(...).subscribe(...)
, and thus omit the signalRService.order$
-part, everything works fine as well.
Upvotes: 2
Views: 732
Reputation: 20454
The issue is that group.pipe(toArray())
is never going to emit, because group is an observable that stays open (presumably until order$ completes), and toArray waits to emit until the observable completes. That being said, the use of groupBy was probably overkill to begin with.
If I interpret you example correctly, you're creating the groups just to pass them off to the correct behavior subject which are in a collection keyed by status. You can just use array.reduce (or Ramda's groupBy) to create groups, no need to convert the array to an observable if you don't really need the stream.
signalRService.orders$.subscribe((orders: OrderModel[]) => {
const groups = orders.reduce((acc, cur) => {
(acc[cur.status] || (acc[cur.status] = [])).push(cur);
return acc;
},
{} as Record<string, OrderModel[]>);
Object.values(entries).forEach(([k, v]) => this.orders[k].next(order);
});
If you really need the groupBy (maybe this is a simplification of a bigger problem), then don't use zip and have an inner pipe instead.
You probably don't need the subjects either. You can map orders to the grouped object and then project the individual groups as needed.
const readonly orders$ = signalRService.orders$.pipe(
map((orders) => orders.reduce((acc, cur) => {
(acc[cur.status] || (acc[cur.status] = [])).push(cur);
return acc;
},
{} as Record<string, OrderModel[]>)
),
shareReplay(1)
);
const readonly orderCategory0$ = this.orders$.pipe(map(x => x['status0'] || []));
const readonly orderCategory1$ = this.orders$.pipe(map(x => x['status1'] || []));
const readonly orderCategory2$ = this.orders$.pipe(map(x => x['status2'] || []));
Upvotes: 3