Reputation: 3171
My application is reading a topic from kafka and after enriching it it saves it to another topic. The StreamsConfig.NUM_STREAM_THREADS_CONFIG
is configured to 8
. I have two brokers and 12 partitions.
Topic: enriched-request PartitionCount: 12 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
Topic: enriched-request Partition: 0 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 1 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 2 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 3 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 4 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 5 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 6 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 7 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 8 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 9 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 10 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 11 Leader: 8 Replicas: 8,7 Isr: 7,8
Since two weeks now in my testing environment I get the log message:
INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group
and this I get for all 8 threads. Every 5 minutes this is triggered and every time its a new set of 8 threads.
In my kafka.log
I see:
Member my-enrichments-client-StreamThread-4-consumer-6409090a-9d06-4bc0-8dd0-cd4c8bd28d71 in group my-enrichments has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
again for all 8 threads. The same as above here, every 5 minutes these are new sets of 8 threads that it is removing.
Only one application is running in my test environment. I tried waiting for 15-20 minutes to redeploy, but I keep getting the same error. Anyone has any idea how I can solve it without having to change the StreamsConfig.CLIENT_ID_CONFIG
or StreamsConfig.APPLICATION_ID_CONFIG
?
Consumer configuration
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
If it helps anything it started to happen a week or so after I increased the number of threads from 3
to 8
. I don't know if it's related to it.
Upvotes: 1
Views: 2630
Reputation: 3171
Since I couldn't find an answer, I tried the following:
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --describe --group my-enrichments
. With this I got the warning Warning: Consumer group 'my-enrichments' is rebalancing.
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka-server:9092 --delete --group my-enrichments
this caused the error: Error: Deletion of some consumer groups failed:
* Group 'my-enrichments' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
Upvotes: 1