Reputation: 53
I'm developing a PoC with Kafka Streams. Now I need to get the offset value in the stream consumer and use it to generate a unique key (topic-offset)->hash
for each message. The reason is: the producers are syslog and only few of them have IDs. I cannot generate a UUID in the consumer because in case of a reprocess I need to regenerate the same key.
My problem is: the org.apache.kafka.streams.processor.ProcessorContext
class exposes an .offset()
method that returns the value, but I'm using KStream instead of the Processor, and I couldn't find a method that returns the same thing.
Anybody knows how to extract the consumer value for each row from a KStream? Thanks in advance.
Upvotes: 5
Views: 8643
Reputation: 6619
Unfortunately, apparently if one Kafka Streams application is assigned to multiple partitions which creates different tasks, the ProcessorContext
might be assigned to different tasks and then topic=null, partition=-1, orrset=-1
.
Did anyone encountered this?
Edit: the reason of that is that I have broken TransformerSupplier
API and returned same instance from it. Always creating new instance fixes the issue.
Upvotes: 0
Reputation: 62330
You can use mix-and-match DSL and Processor API via process(...)
, transform(...)
, and transformValues(...)
.
It allows you to access the current record offset similar to plain Processor API. In you case, it seems you want to use KStream#transform(...)
.
Upvotes: 7