ankush
ankush

Reputation: 167

Kafka consumer group id sharing issue

I have 2 kafka consumers that share the same consumer group id but subscribe to different topics. Each of them only has access to read from its corresponding topic.

When the 1st consumer is run it gets assigned the partitions from the topic it subscribes to. And when the 2nd consumer is run as well, the consumer group rebalances (causing the partitions assigned to the 1st consumer to get revoked). So far, so good. This is consistent with the discussion in Kafka Consumer Group Id and consumer rebalance issue.

But then, i start seeing TOPIC_AUTHORIZATION_FAILED in consumer 1 - apparently it tries to access the other topic that it doesn't subscribe to and has no access over. From this point forward the consumer doesn't move forward and keeps erroring out.

I was expecting that post rebalance, consumer 1 would get reassigned the partitions from the topic it subscribes to and continue on its way. Why is consumer 1 trying to access the other topic / How do i fix this?

Logs

common-request-test : Consumer 1's topic

common-request-dev: Consumer 2's topic

Below are the Consumer 1 logs (Notice the last couple of lines where it tries to access common-request-dev) -

{"@timestamp":"2021-10-22T07:39:17.550Z","message":"onPartitionsAssigned: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:39:17.853Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Found no committed offset for partition common-request-test-2","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.857Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-3 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1-devtest1:9093 (id: 0 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:17.858Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition common-request-test-1 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka2-devtest1:9093 (id: 1 rack: null)], epoch=0}}","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:39:19.382Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Resetting offset for partition common-request-test-2 to offset 0.","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.SubscriptionState","ex":""}

{"@timestamp":"2021-10-22T07:43:21.598Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Attempt to heartbeat failed since group is rebalancing","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Revoke previously assigned partitions common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.ConsumerCoordinator","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"onPartitionsRevoked: [common-request-test-2, common-request-test-3, common-request-test-0, common-request-test-1]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"c.m.l.e.s.a.config.KafkaReceiverConfig","ex":""}

{"@timestamp":"2021-10-22T07:43:21.599Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] (Re-)joining group","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}
{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Error while fetching metadata with correlation id 920 : {common-request-dev=TOPIC_AUTHORIZATION_FAILED}","severity":"WARN","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.NetworkClient","ex":""}

{"@timestamp":"2021-10-22T07:43:22.174Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Topic authorization failed for topics [common-request-dev]","severity":"ERROR","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"org.apache.kafka.clients.Metadata","ex":""}

{"@timestamp":"2021-10-22T07:43:22.175Z","message":"[Consumer clientId=consumer-stream-1, groupId=stream] Join group failed with org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [common-request-dev]","severity":"INFO","service":"stream","x-correlation-id":"","client-id":"","account-id":"","trace":"0781f628a70f1e76","thread":"reactive-kafka-stream-1","class":"o.a.k.c.c.internals.AbstractCoordinator","ex":""}

Upvotes: 0

Views: 1864

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191983

Sounds like you try either using the StickyAssignor for your consumer config, or use assign, not subscribe (or talk to the person maintaining the ACL policies, and tell them what access you need)

Otherwise, yes, rebalancing happens for the whole group's topic set. You're subscribing to a topic for the group, not a single consumer.

Upvotes: 1

Related Questions