Reputation: 21
I've following interfaces and Observable<Machine[]>
, what I want to achive is group by Machine symbol
property in Observable<Machine[]>
and return mapped observable Observable<Order[]>
.
export interface Machine {
symbol: string;
price: number;
amount: number;
id: number;
}
export interface Order {
symbol: string;
machines: OrderMachine[];
}
export interface OrderMachine {
price: number;
amount: number;
id: number;
}
I've tried to use RxJS gropBy operator but it seems it return grouped array one by one.
machines: Machine[] = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 }
];
of(machines).pipe(
takeUntil(this.unsubscribe),
mergeMap(res => res),
groupBy(m => m.symbol),
mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
map(x => { // here I have probably wrong model [string, Machine[]]
const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
return <Order>{ symbol: x[0], machines: orderMachines } })
);
as in result I have Observable<Order>
istead ofObservable<Order[]>
.
expected result model:
orders: Order[] = [
{
symbol: "A",
machines: [
{ amount: 1, price: 1, id: 1 },
{ amount: 1, price: 2, id: 2 }
]
}
];
Upvotes: 1
Views: 976
Reputation: 779
Here a possible solution based on your approach but with a few changes:
const machines = [
{ amount: 1, id: 1, symbol: "A", price: 1 },
{ amount: 1, id: 2, symbol: "A", price: 2 },
{ amount: 1, id: 3, symbol: "B", price: 3 }
];
from(machines) // (1)
.pipe(
// (2)
groupBy((m) => m.symbol),
mergeMap((group) => group.pipe(toArray())),
map((arr) => ({
symbol: arr[0].symbol, // every group has at least one element
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray(), // (3)
)
.subscribe(console.log);
(1) I changed of(machines)
to from(machines)
in order to emit the objects from machines
one by one into the stream. Before that change the whole array was emitted at once and thus the stream was broken.
(2) I removed takeUntil(this.unsubscribe)
and mergeMap(res => res)
from the pipe since there is no reason to have them in your example. takeUntil
wouldn't have any effect since the stream is finite and synchronous. An identity function (res => res
) applied with mergeMap
would make sense in a stream of streams which is not the case in your example. Or do you actually need these operators for your project because you have an infinite stream of observables?
(3) toArray()
is what transforms Observable<Order>
to Observable<Order[]>
. It waits until the stream ends and emits all streamed values at once as an array.
edit:
The op has mentioned that he rather needs a solution that is compatible with an infinite stream but because toArray
only works with finite streams the provided answer above would never emit anything in such scenario.
To solve this I would avoid using groupBy
from rxjs. It cvan be a very powerful tool in other cases where you need to split one stream into several groups of streams but in your case you simply want to group an array and there are easier methods for that that.
this.store.pipe(
select(fromOrder.getMachines)
map((arr) =>
// (*) group by symbol
arr.reduce((acc, { symbol, price, amount, id }) => {
acc[symbol] = {
symbol,
machines: (acc[symbol] ? acc[symbol].machines : [])
.concat({ price, amount, id })
};
return acc;
}, {})
),
)
.subscribe((result) =>
// (**)
console.log(Object.values(result))
);
(*) you could use a vanilla groupBy implementation that returns an object of the shape {[symbol: string]: Order}
.
(**) result
is an object here but you can convert it to an array easily but applying Object.values(result)
Upvotes: 3
Reputation: 21
@kruschid Thank you very much for your reply, it works properly but unfortynetelly, it doesn't work when I want to use it with my store (ngrx), type is ok but it stops to show log after mergeMap
method:
this.store.pipe(select(fromOrder.getMachines),
mergeMap(res => res), // Machine[]
groupBy((m) => m.symbol),
tap(x => console.log(x)), //this shows object GroupedObservable {_isScalar: false, key: "A", groupSubject: Subject, refCountSubscription: GroupBySubscriber}
mergeMap((group) => group.pipe(toArray())),
tap(x => console.log(x)), // this is not printed in console
map((arr) => <Order>({
symbol: arr[0].symbol,
machines: arr.map(({ price, amount, id }) => ({
price,
amount,
id
}))
})),
toArray())) // (3)
Upvotes: 0