KRico
KRico

Reputation: 641

Kafka is giving: "The group member needs to have a valid member id before actually entering a consumer group"

I am using Kafka to consume messages in Java. I want to test by starting the same app multiple times on my local box. When I start up, the first time I am able to start consuming messages from the topic. When I start up a second one I get:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

and dont get any messages from the topic. If I try to start more of them I get the same issues.

The configuration I am using for Kafka is

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.use.type.headers: false
    listener:
      missing-topics-fatal: false

I have two topics

@Configuration
public class KafkaTopics {
    @Bean("alertsTopic")
    public NewTopic alertsTopic() {

        return TopicBuilder.name("XXX.alerts")
            .compact()
            .build();
    }

    @Bean("serversTopic")
    public NewTopic serversTopic() {

        return TopicBuilder.name("XXX.servers")
            .compact()
            .build();
    }

}

And two listeners in different class files.

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=java.lang.String",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
    })
public void registerServer(
    @Payload(required = false) ServerInfo serverInfo
) 

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
    id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
    })
public void processAlert(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
    @Header(KafkaHeaders.OFFSET) long offset,
    @Payload(required = false) AlertOnKafka alert)

Upvotes: 31

Views: 81706

Answers (7)

winter
winter

Reputation: 2917

The reason for this error can also comes from the fact that your Principal acl doesn't have "Create" acl on the requested topic. So, if u create the topic manually, the warning will stop. Or, you can just add Create acl to your service principal. I have just had the same issue with none of the suggested answers.

Upvotes: 0

zeyuan zhang
zeyuan zhang

Reputation: 1

resolved, cause: 3 node kafka cluster,one node disk dropped.and mount with a new disk. then it happened

Upvotes: 0

zwessels
zwessels

Reputation: 637

If the answer from @Archimedes Trajano doesn't work for you (like in my case), then this happens when kafka can't pick up the consumer group id.

if you have a single consumer group, you can specify it in the properties file like this:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: insert-your-consumer-group-id
      ... rest of your properties ...

or if you have multiple consumers then you can specify the groupId for each one:

@KafkaListener(topics="topic-1", groupId="group-1")
public void registerServer(@Payload(required = false) ServerInfo serverInfo) 

@KafkaListener(topics="topic-2",groupId="group-2")
public void processAlert(@Payload(required = false) AlertOnKafka alert)

docs: https://docs.spring.io/spring-kafka/docs/2.8.11/reference/html/#annotation-properties

Upvotes: 2

Abhimanyu Singh
Abhimanyu Singh

Reputation: 131

In my experience this is caused by a missing group instance id configuration.

Below is what I use while configuring the kafka consumer for a group.

consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,("SchedulerCoordinator"+UUID.randomUUID()));

Bear in mind that I personally prefer class configurations over property based ones and these consumerProperties go into the Consumer initialisation.

Upvotes: 4

zhili
zhili

Reputation: 61

hummm....in my case, consumers will rejoin the group with a generated ID. Here is my test and result. FYI

@Test
    void testSyncSend() throws ExecutionException, InterruptedException {
        int id = (int)(System.currentTimeMillis()/1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend] id:{}, result:{}", id, result);
        new CountDownLatch(1).await();
    }
2021-04-20 14:32:20.980  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1618900340980
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-1, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-4, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-3, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-2, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.317  INFO 6672 --- [           main] c.z.s.cbaConnector.KafkaProducerTest     : [testSyncSend] id:1618900340, result:SendResult [producerRecord=ProducerRecord(topic=test1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 122, 104, 105, 108, 105, 46, 115, 109, 115, 109, 111, 100, 117, 108, 101, 46, 101, 110, 116, 105, 116, 121, 46, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Message(id=1618900340), timestamp=null), recordMetadata=test1-1@0]
2021-04-20 14:32:23.770  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Finished assignment for group at generation 2: {consumer-listener1-4-9a383250-e84d-413a-b012-85405abdcf7f=Assignment(partitions=[test1-8, test1-9]), consumer-listener1-2-3d26d9ef-b973-4d19-a930-5ba77d938680=Assignment(partitions=[test1-3, test1-4, test1-5]), consumer-listener1-1-10b6895e-264e-45bd-ba90-c71ea12b21e5=Assignment(partitions=[test1-0, test1-1, test1-2]), consumer-listener1-3-54ce965a-87cd-4e28-b0e9-b0f2c9f69423=Assignment(partitions=[test1-6, test1-7])}
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.776  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Adding newly assigned partitions: test1-5, test1-4, test1-3
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Adding newly assigned partitions: test1-6, test1-7
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Adding newly assigned partitions: test1-0, test1-2, test1-1
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Adding newly assigned partitions: test1-9, test1-8
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Found no committed offset for partition test1-8
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Found no committed offset for partition test1-6
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-2
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-1
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-5 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Setting offset for partition test1-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Setting offset for partition test1-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-4 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.792  INFO 6672 --- [listener1-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-5, test1-4, test1-3]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-4, groupId=listener1] Resetting offset for partition test1-8 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-3, groupId=listener1] Resetting offset for partition test1-6 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-2 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-1 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-9, test1-8]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-6, test1-7]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-0, test1-2, test1-1]
2021-04-20 14:32:23.817  INFO 6672 --- [listener1-0-C-1] c.z.s.cbaConnector.KafkaConsumer         : [KafakaConsumer][consume] thread:21 received message:{"id":1618900340}

Upvotes: 0

DaHoC
DaHoC

Reputation: 367

I encountered the same issue, also preventing my consumer from subscribing to certain topics.

I figured that the member.id might be similar to the client-id (using Camel here), which might be in some way related to the consumer group.

What fixed it for me is a changed, this time non-changing client-id for my consuming service, leaving the same consumer group as is.

Upvotes: 0

Archimedes Trajano
Archimedes Trajano

Reputation: 41290

From my analysis. This is normal behaviour, you can change the log levels to exclude it.

The reason for this is if the server detects that the client can support member.id it will give that error back to the client. This is noted in KIP-394.

The client will then reconnect back to the server with a generated member ID.

Upvotes: 23

Related Questions