Reputation: 8879
I have a scenario in which I have a stream of RxJS events that I pass to a custom operator. The events can be either
The events are of the type CustomEvent
and CustomChunkedEvent
, where the latter is built from the data of the first one.
interface ChunkedCustomEvent {
id: string; // The id that chunks are grouped by
index: number;
chunk: string;
chunkCount: number; // The number of chunks that make up the whole event
}
My custom operator groups the chunked events by the id, and then buffers the events with bufferCount
until full, and then emits. This works great if the chunk count is the same as the current set buffer count, but I can't wrap my head around how I would set the bufferCount
value to the chunkCount
value so it's not constant but instead different for each chunk group.
At the moment, the operator looks like this, but it's obviously with a static number for the bufferCount
.
export function handleChunkedEvents(): MonoTypeOperatorFunction<CustomEvent> {
return pipe(
groupBy(customEvent => {
const chunkedEvent: ChunkedCustomEvent = { ...customEvent.data };
return chunkedEvent.id;
}),
mergeMap(group => {
return group.pipe(
bufferCount(4), // I want this to be equal of the known chunk count
map(groupOfChunked => {
// Do something with the grouped chunks and return it as a whole
// If it isn't a chunked event, return the event instead
})
)
})
);
}
Is there a way to achieve what I want or am I approaching this the wrong way?
Upvotes: 1
Views: 274
Reputation: 2775
bufferCount
is just a convenient special case of buffer
, which buffers
based on the emission of a "signal" observable. That's the way we want to go
here.
We're only dealing with the mergeMap
below as it's all we need to change from
your example.
mergeMap((group$) => {
// Handle the non-chunked case - the group with a undefined key will emit
// only non-chunked events - just let it do its thing
if (!group$.key) return group$;
// From the group, create a signal observable that fires when the chunkCount
// matches the 0-index plus 1
const chunkCountReached$ = group$.filter((event, i) => event.data.chunkCount === i + 1);
return group$.pipe(
buffer(chunkCountReached$),
map((groupOfChunked) => {
// Do something with the grouped chunks and return it as a whole
})
);
});
EDIT: This is also a valid solution, and has the benefit of completing the
group observable once chunkCount
is reached.
const chunkCountReached$ = group$.filter((event, i) => event.data.chunkCount === i + 1);
return group$.pipe(
takeUntil(chunkCountReached$),
toArray(),
map((groupOfChunked) => {
// Do something with the grouped chunks and return it as a whole
})
);
FINAL EDIT (because I can't help but codegolf):
return group$.pipe(
takeWhile((event, i) => i + 1 < event.data.chunkCount, true),
toArray(),
map((groupOfChunked) => {
// Do something with the grouped chunks and return it as a whole
})
);
Upvotes: 1