Stefano Frigerio
Stefano Frigerio

Reputation: 53

How can I get the offset value in KStream

I'm developing a PoC with Kafka Streams. Now I need to get the offset value in the stream consumer and use it to generate a unique key (topic-offset)->hash for each message. The reason is: the producers are syslog and only few of them have IDs. I cannot generate a UUID in the consumer because in case of a reprocess I need to regenerate the same key.

My problem is: the org.apache.kafka.streams.processor.ProcessorContext class exposes an .offset() method that returns the value, but I'm using KStream instead of the Processor, and I couldn't find a method that returns the same thing.

Anybody knows how to extract the consumer value for each row from a KStream? Thanks in advance.

Upvotes: 5

Views: 8643

Answers (2)

Athlan
Athlan

Reputation: 6619

Unfortunately, apparently if one Kafka Streams application is assigned to multiple partitions which creates different tasks, the ProcessorContext might be assigned to different tasks and then topic=null, partition=-1, orrset=-1.

Did anyone encountered this?


Edit: the reason of that is that I have broken TransformerSupplier API and returned same instance from it. Always creating new instance fixes the issue.

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

You can use mix-and-match DSL and Processor API via process(...), transform(...), and transformValues(...).

It allows you to access the current record offset similar to plain Processor API. In you case, it seems you want to use KStream#transform(...).

Upvotes: 7

Related Questions