Reputation: 11
I'm trying to create multiple consumers with different consumer groups to a kafka topic using kafka-clients v.0.10.2.1. Although I'm not able to retrieve the last offset commited by a consumer group.
Currently my Consumer properties looks like this
Properties cproperties = new Properties();
cproperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
cproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, my-broker));
cproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
cproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, taskDecoder.getClass());
cproperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
And without the property Auto-reset-offset I can't consume from the topic, but i can't use this config, I need the consumer group registered on zookeeper. So, I need to create a consumer group on zookeeper /consumers too.
Upvotes: 1
Views: 9655
Reputation: 1
Similar issue I fixed by adding below property in Kafka configuration:
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
Upvotes: 0
Reputation: 39880
You need to include the property auto.offset.reset
to earliest
(or latest
, depending on what are you trying to achieve) in order to avoid throwing an exception when an offset is not found (probably because data is deleted).
You also need to make sure that you manually commit offsets since you've disable auto-commit.
cproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
To do so, you can either use commitSync()
This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
Note that if you don't commit offsets an exception will be thrown if auto.offset.reset
is set to none
.
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.
Upvotes: 0