Reputation: 133
I faced an issue that my Kafka does rebalancing almost each 5-10 min for an unknown reason. I use Kafka and apache storm. A typical situation that logs before Backpressure (Strom stuff) repeating but after some time I see Node 2147482646 disconnected and other logs with "coordinator unavailable.isDisconnected: true" and after this everything is rebalancing and as for me it's not a healthy situation despite the fact that Kafka works fine.
Almost all my topics have 3 replicas and 10 partitions and I have 10 Spouts (10 consumers)
Ive read that such configs as:
heartbeat.interval.ms
session.timeout.ms
rebalance.timeout.ms
can affect this. My set up was:
heartbeat.interval.ms: 5000
session.timeout.ms: 30000
rebalance.timeout.ms: 60000
And I thought that maybe due to some connection issues 4-5 heartbeat messages are not enough and tried heartbeat.interval.ms: 2000 but it did not help.
As I understood from searching kafka lib is that it happens in NetworkClient in poll() method and later there handleDisconnections(responses, updatedNow) and there selector is already disconnected.
What else can cause such kind of issue?
2022-09-30 10:59:54.599 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:54.599 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:54.601 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:54.601 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:54.614 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 1 is in LocalityScope RACK_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 2 is in LocalityScope WORKER_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 3 is in LocalityScope RACK_LOCAL
2022-09-30 10:59:54.615 o.a.s.g.LoadAwareShuffleGrouping refresh-load-timer [DEBUG] targetTask 4 is in LocalityScope WORKER_LOCAL
2022-09-30 10:59:54.630 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.630 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.680 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.680 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.731 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.731 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.781 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.781 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.801 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:54.801 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:54.802 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:54.803 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:54.831 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.832 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.882 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.882 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.933 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.933 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.983 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:54.983 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:54.995 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FETCH response from node 1003 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-segmentation.check.groupid-1, correlationId=2202): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=808668183, responses=[])
2022-09-30 10:59:54.995 o.a.k.c.FetchSessionHandler Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Node 1003 sent an incremental fetch response with throttleTimeMs = 0 for session 808668183 with 0 response partition(s), 1 implied partition(s)
2022-09-30 10:59:54.996 o.a.k.c.c.i.Fetcher Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Added READ_UNCOMMITTED fetch request for partition segmentation.check.queue-1 at position FetchPosition{offset=1136153, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-broker2:9092 (id: 1003 rack: /rack2)], epoch=4}} to node kafka-broker2:9092 (id: 1003 rack: /rack2)
2022-09-30 10:59:54.996 o.a.k.c.FetchSessionHandler Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Built incremental fetch (sessionId=808668183, epoch=2152) for node 1003. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s)
2022-09-30 10:59:54.996 o.a.k.c.c.i.Fetcher Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(segmentation.check.queue-1), canUseTopicIds=False) to broker kafka-broker2:9092 (id: 1003 rack: /rack2)
2022-09-30 10:59:54.996 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-segmentation.check.groupid-1, correlationId=2203) and timeout 60000 to node 1003: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=808668183, sessionEpoch=2152, topics=[], forgottenTopicsData=[], rackId='')
2022-09-30 10:59:55.003 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:55.003 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:55.005 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:55.005 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:55.033 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.034 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.084 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.084 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.134 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.134 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.185 o.a.s.d.w.WorkerState backpressure-check-timer [DEBUG] Checking for change in Backpressure status on worker's tasks
2022-09-30 10:59:55.185 o.a.s.d.w.BackPressureTracker backpressure-check-timer [DEBUG] Running Back Pressure status change check
2022-09-30 10:59:55.206 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Node 2147482646 disconnected.
2022-09-30 10:59:55.206 o.a.s.k.s.KafkaSpout Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Polled [0] records from Kafka
2022-09-30 10:59:55.206 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Resuming partitions []
2022-09-30 10:59:55.208 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] Topic partitions with entries ready to be retried [{}]
2022-09-30 10:59:55.208 o.a.k.c.c.KafkaConsumer Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Pausing partitions []
2022-09-30 10:59:55.208 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Group coordinator kafka-broker0:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2022-09-30 10:59:55.208 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FindCoordinator request to broker kafka-broker0:9092 (id: 1001 rack: /rack0)
2022-09-30 10:59:55.209 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='segmentation.check.queue')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node kafka-broker0:9092 (id: 1001 rack: /rack0)
2022-09-30 10:59:55.209 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-segmentation.check.groupid-1, correlationId=2205) and timeout 60000 to node 1001: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='segmentation.check.queue')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
2022-09-30 10:59:55.210 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204) and timeout 60000 to node 1001: FindCoordinatorRequestData(key='segmentation.check.groupid', keyType=0, coordinatorKeys=[])
2022-09-30 10:59:55.212 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received METADATA response from node 1001 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-segmentation.check.groupid-1, correlationId=2205): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1001, host='kafka-broker0', port=9092, rack='/rack0'), MetadataResponseBroker(nodeId=1003, host='kafka-broker2', port=9092, rack='/rack2'), MetadataResponseBroker(nodeId=1002, host='kafka-broker1', port=9092, rack='/rack1')], clusterId='fzb0hRraRb6mSwlmiWGOvA', controllerId=1001, topics=[MetadataResponseTopic(errorCode=0, name='segmentation.check.queue', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=1002, leaderEpoch=3, replicaNodes=[1002, 1003, 1001], isrNodes=[1001, 1002, 1003], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1001, leaderEpoch=1, replicaNodes=[1001, 1002, 1003], isrNodes=[1001, 1002, 1003], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=1003, leaderEpoch=4, replicaNodes=[1003, 1001, 1002], isrNodes=[1001, 1002, 1003], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-0 from 3 to epoch 3 from new metadata
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-2 from 1 to epoch 1 from new metadata
2022-09-30 10:59:55.212 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updating last seen epoch for partition segmentation.check.queue-1 from 4 to epoch 4 from new metadata
2022-09-30 10:59:55.213 o.a.k.c.Metadata Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Updated cluster metadata updateVersion 7 to MetadataCache{clusterId='fzb0hRraRb6mSwlmiWGOvA', nodes={1001=kafka-broker0:9092 (id: 1001 rack: /rack0), 1002=kafka-broker1:9092 (id: 1002 rack: /rack1), 1003=kafka-broker2:9092 (id: 1003 rack: /rack2)}, partitions=[PartitionMetadata(error=NONE, partition=segmentation.check.queue-0, leader=Optional[1002], leaderEpoch=Optional[3], replicas=1002,1003,1001, isr=1001,1002,1003, offlineReplicas=), PartitionMetadata(error=NONE, partition=segmentation.check.queue-2, leader=Optional[1001], leaderEpoch=Optional[1], replicas=1001,1002,1003, isr=1001,1002,1003, offlineReplicas=), PartitionMetadata(error=NONE, partition=segmentation.check.queue-1, leader=Optional[1003], leaderEpoch=Optional[4], replicas=1003,1001,1002, isr=1001,1002,1003, offlineReplicas=)], controller=kafka-broker0:9092 (id: 1001 rack: /rack0)}
2022-09-30 10:59:55.213 o.a.k.c.NetworkClient Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FIND_COORDINATOR response from node 1001 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1001, host='kafka-broker0', port=9092, coordinators=[])
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [DEBUG] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Received FindCoordinator response ClientResponse(receivedTimeMs=1664535595213, latencyMs=5, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-segmentation.check.groupid-1, correlationId=2204), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1001, host='kafka-broker0', port=9092, coordinators=[]))
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Discovered group coordinator kafka-broker0:9092 (id: 2147482646 rack: null)
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Group coordinator kafka-broker0:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2022-09-30 10:59:55.214 o.a.k.c.c.i.ConsumerCoordinator Thread-16-SegmentationCheck_Spout-executor[6, 6] [INFO] [Consumer clientId=consumer-segmentation.check.groupid-1, groupId=segmentation.check.groupid] Requesting disconnect from last known coordinator kafka-broker0:9092 (id: 2147482646 rack: null)
Upvotes: 2
Views: 8854
Reputation: 133
As I found out if set log level to trace (not debug) then I see the reason:
About to close the idle connection from 2147482646 due to being idle for 540005 millis
Node 2147482646 disconnected.
Because:
connections.max.idle.ms 600000 (10mins) is default for server
connections.max.idle.ms 540000 (9mins) is default for consumers
But I still cant get why a connection is idle. Opened another question related to it. When Kafka connection is idle?
Upvotes: 3