Jean Wisser
Jean Wisser

Reputation: 55

How to manually commit kafka offset after FileIO in apache beam?

I have a FileIO writing a Pcollection<GenericRecord> to files and returns WriteFilesResult<DestinationT>.

I would like to create a DoFn after writing files to commit the offset of written records to kafka but since my offsets are stored in my GenericRecords I can no longer access them in the output of FileIO.

What is the best way to solve this ?

Upvotes: 1

Views: 200

Answers (1)

Jean Wisser
Jean Wisser

Reputation: 55

For anyone interested, here is how I did:

  • manually groupbykey records by DestinationT
  • for each group I get the list of offsets and I create a new key EnrichedDestinationT + flatten the iterable
  • so the Pcollection before entering FileIO is PCollection<KV<EnrichedDestinationT, GenericRecord>>
  • in FileIO, the .by() becomes .by(KV::getKey) and the .via() becomes .via(Contextful.fn(KV::getValue), Contextful.fn(this::getSink))

Upvotes: 1

Related Questions