Reputation: 1377
In my Java app, Once every couple of seconds, I'm assigning a specific TopicPartition to the consumer, and trying to read specific message from specific topic + partition. After reading the message (using poll() ) I immediately disconnect the consumer.
Because the scenario above can run in multi-threaded environment The consumer group name has prefix + random hash e.g. my_consumer_group_EWQSV (since kafka will not assign same specific partition to two consumers on the same group).
The problem is, I couldn't tell kafka to delete those consumers after they disconnect (since those consumers are just temporary), is there any way of doing this? (not manually, I mean by using configuration or something, I wasn't able to find any conf like "auto-delete-after-consumer-disconnect")
Thanks :)
Upvotes: 1
Views: 4086
Reputation: 1377
In java, in order to manually consume specific message from kafka without creating consumer group:
The following is sufficient:
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
TopicPartition tp = new TopicPartition(topic, partition);
try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
}
The important parts are:
Upvotes: 2