Daniel B
Daniel B

Reputation: 8879

Setting bufferCount value based on source observable

I have a scenario in which I have a stream of RxJS events that I pass to a custom operator. The events can be either

  1. Single events, meaning they are complete when when it's received.
  2. A partial event, meaning they are a part of a chunked larger event.

The events are of the type CustomEvent and CustomChunkedEvent, where the latter is built from the data of the first one.

interface ChunkedCustomEvent {
  id: string; // The id that chunks are grouped by
  index: number; 
  chunk: string; 
  chunkCount: number; // The number of chunks that make up the whole event
}

My custom operator groups the chunked events by the id, and then buffers the events with bufferCount until full, and then emits. This works great if the chunk count is the same as the current set buffer count, but I can't wrap my head around how I would set the bufferCount value to the chunkCount value so it's not constant but instead different for each chunk group.

At the moment, the operator looks like this, but it's obviously with a static number for the bufferCount.

export function handleChunkedEvents(): MonoTypeOperatorFunction<CustomEvent> {
  return pipe(
    groupBy(customEvent => {
      const chunkedEvent: ChunkedCustomEvent = { ...customEvent.data };
      return chunkedEvent.id;
    }),
    mergeMap(group => {
      return group.pipe(
        bufferCount(4), // I want this to be equal of the known chunk count
        map(groupOfChunked => {
          // Do something with the grouped chunks and return it as a whole
          // If it isn't a chunked event, return the event instead
        })
      )
    })
  );
}

Is there a way to achieve what I want or am I approaching this the wrong way?

Upvotes: 1

Views: 274

Answers (1)

backtick
backtick

Reputation: 2775

bufferCount is just a convenient special case of buffer, which buffers based on the emission of a "signal" observable. That's the way we want to go here.

We're only dealing with the mergeMap below as it's all we need to change from your example.

mergeMap((group$) => {
  // Handle the non-chunked case - the group with a undefined key will emit
  // only non-chunked events - just let it do its thing
  if (!group$.key) return group$;

  // From the group, create a signal observable that fires when the chunkCount
  // matches the 0-index plus 1
  const chunkCountReached$ = group$.filter((event, i) => event.data.chunkCount === i + 1);

  return group$.pipe(
    buffer(chunkCountReached$),
    map((groupOfChunked) => {
      // Do something with the grouped chunks and return it as a whole
    })
  );
});

EDIT: This is also a valid solution, and has the benefit of completing the group observable once chunkCount is reached.

const chunkCountReached$ = group$.filter((event, i) => event.data.chunkCount === i + 1);

return group$.pipe(
  takeUntil(chunkCountReached$),
  toArray(),
  map((groupOfChunked) => {
    // Do something with the grouped chunks and return it as a whole
  })
);

FINAL EDIT (because I can't help but codegolf):

return group$.pipe(
  takeWhile((event, i) => i + 1 < event.data.chunkCount, true),
  toArray(),
  map((groupOfChunked) => {
    // Do something with the grouped chunks and return it as a whole
  })
);

Upvotes: 1

Related Questions