Reputation: 35
I have an akka application (in JAVA) that uses commitablePartitionedSource
for consuming messages from kafka topics. I have a few consumer groups that spin up consumers for multiple topics. This is driven by a dynamic configuration where I can shutdown consumers temporarily and maybe start them back at a later point.
When this consumer restarts, I only want to read new messages and not start from where I left off.
Is there a way to get the kafkaConsumer object from akka-alpakka consumer, so that I can seekToEnd() before processing? Please let me know if there is any other way to achieve this? Maybe with akka config or a different type of consumer? I prefer not maintaining my own offsets (Hopefully not the only option)
My config is set up to get the latest
offset when I start a consumer group, but since I am shutting down and restarting individual consumers it always starts consuming where I left off.
I tried creating a consumer group for a topic, but I have many topics and it turns out pretty resource intensive. I also looked for a way to clear offsets stored in kafka for that topic unsuccessfully.
Upvotes: 1
Views: 2722
Reputation: 301
commitablePartitionedSource
takes AutoSubscription
as an input, which you cannot specify offset.
What you need is a method that takes ManualSubscription
or higher level Subscription
, such as
> plainExternalSource
> committableExternalSource
> plainSource
...
Upvotes: 0
Reputation: 2130
The easiest way would just be to create a new consumer group each time you start your restart your consumer. Kafka will take care of removing stale consumer groups after a configurable amount of time (retention.ms).
This strategy is fine if you rarely restart your consumer and always want it to process fresh data instead of catching up with all the missed messages.
EDIT
As far as I know the only way to have access to the underlying KafkaConsumer is to use a committableExternalSource
. This way you'll have access to the seekToEnd
method, however you'll also need take care of subscribing to the topic providing the start offset per partition (similarly as how you're doing now setting up the committablePartitionedSource
but outside of Akka).
Upvotes: 1