Eldar
Eldar

Reputation: 10790

Strange Kafka Lag Issue

I have a kafka cluster with 3 brokers and 3 zoo-keepers and using strimzi template for openshift 3.11. And I have topics with 20 or 30 partitions. Well I can't diagnose why it is happening exactly why but after some mysterious event some of the partitions start to have a lag. Having a lag is normal thing when you can't process the data as fast as it is produced. So I stop to producers to observe what is happening. And that is the weird part.

> sg-1-ProductDate sfz.public.ProductDate 23         31564011        35276642        3712631         sync-client-synchronizer-62-tg6n2-$ProductDate-77fca4ff-7588-431f-9d52-eda5bed7b2a2 /10.128.4.239   sync-client-synchronizer-62-tg6n2-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 29         -               35270447        -               sync-client-synchronizer-62-ztnm6-$ProductDate-46cd94ee-8607-4430-92fd-5666baaf1010 /10.129.5.193   sync-client-synchronizer-62-ztnm6-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 11         -               35326076        -               sync-client-synchronizer-62-m6v65-$ProductDate-0c5cdba0-1f1f-4c27-afae-3c529ca17186 /10.128.7.75    sync-client-synchronizer-62-m6v65-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 25         -               35027302        -               sync-client-synchronizer-62-v4n7g-$ProductDate-92bad60e-0e38-4738-bb6f-850276e0ed63 /10.128.7.80    sync-client-synchronizer-62-v4n7g-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 26         -               35328140        -               sync-client-synchronizer-62-vfhhh-$ProductDate-17509428-be8c-4c03-b694-738e3e1e1fad /10.129.7.248   sync-client-synchronizer-62-vfhhh-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 4          -               35291468        -               sync-client-synchronizer-62-9n5jg-$ProductDate-e8682966-ae8c-400d-b246-05b485b4d332 /10.128.4.242   sync-client-synchronizer-62-9n5jg-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 9          31430665        35125759        3695094         sync-client-synchronizer-62-jsrfw-$ProductDate-c20d440b-a5f6-4b32-8ccf-db657ae8ec67 /10.130.5.162   sync-client-synchronizer-62-jsrfw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 10         -               35070277        -               sync-client-synchronizer-62-kz6hn-$ProductDate-37ef221c-07f6-4ffd-8a1d-ed556c0c38c5 /10.129.5.197   sync-client-synchronizer-62-kz6hn-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 16         -               35233649        -               sync-client-synchronizer-62-pxl4d-$ProductDate-126c2230-eb5a-4cf8-9d29-f7b516075f03 /10.128.7.79    sync-client-synchronizer-62-pxl4d-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 14         -               35418276        -               sync-client-synchronizer-62-p5bmj-$ProductDate-ffdef648-0b8a-40f0-b5a3-062715ee1a61 /10.129.5.196   sync-client-synchronizer-62-p5bmj-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 6          -               35234349        -               sync-client-synchronizer-62-dwz7n-$ProductDate-2d294290-583d-4ffb-ac83-e0470b7bb8c6 /10.128.7.74    sync-client-synchronizer-62-dwz7n-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 7          31450092        35153556        3703464         sync-client-synchronizer-62-g69hc-$ProductDate-1c1f8765-a01f-436e-8903-1aefb5c5f203 /10.128.4.245   sync-client-synchronizer-62-g69hc-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 17         -               35248649        -               sync-client-synchronizer-62-qfz9f-$ProductDate-73a40da6-8e80-4de4-a0ff-a4ad6904d651 /10.128.4.244   sync-client-synchronizer-62-qfz9f-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 5          -               35354481        -               sync-client-synchronizer-62-cjhts-$ProductDate-084892b8-2918-480e-9fcc-d6b1e8dcfe22 /10.128.4.240   sync-client-synchronizer-62-cjhts-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 3          32775591        35174921        2399330         sync-client-synchronizer-62-5wls7-$ProductDate-ec1f3855-ca3f-4add-b0c5-9c991dd3733e /10.128.4.243   sync-client-synchronizer-62-5wls7-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 1          -               35151480        -               sync-client-synchronizer-62-2f7ln-$ProductDate-47ab446a-356e-41b8-8224-2de3ca93eaf8 /10.128.7.76    sync-client-synchronizer-62-2f7ln-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 12         31637933        35359850        3721917         sync-client-synchronizer-62-ndfct-$ProductDate-baeb1d9b-3574-41e3-b976-7f73808bf4d3 /10.129.7.250   sync-client-synchronizer-62-ndfct-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 27         -               35247067        -               sync-client-synchronizer-62-vz2hv-$ProductDate-0c3022e2-4688-49a5-bf3c-77c0fed69a38 /10.130.5.163   sync-client-synchronizer-62-vz2hv-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 2          -               35278219        -               sync-client-synchronizer-62-5792q-$ProductDate-75d63e1e-23ec-4378-b506-2315f043307d /10.128.7.78    sync-client-synchronizer-62-5792q-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 20         -               35471162        -               sync-client-synchronizer-62-sqrbg-$ProductDate-78d31620-d4e2-4eff-880d-d79e9cffaeff /10.130.5.165   sync-client-synchronizer-62-sqrbg-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 0          -               35099257        -               sync-client-synchronizer-62-2cngk-$ProductDate-de0797dc-8592-46ca-98d5-32c0ccc0cc69 /10.130.5.161   sync-client-synchronizer-62-2cngk-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 8          -               35370008        -               sync-client-synchronizer-62-j8xnw-$ProductDate-d436268c-8b55-4045-95b0-e11a60d457f5 /10.128.7.73    sync-client-synchronizer-62-j8xnw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 28         -               35427999        -               sync-client-synchronizer-62-wkrjz-$ProductDate-6ea118b3-f2b2-402b-bd67-5e2d90e7af95 /10.128.4.241   sync-client-synchronizer-62-wkrjz-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 21         -               35338947        -               sync-client-synchronizer-62-t8k4m-$ProductDate-7363dc4b-41f7-4657-ba0d-25e766646ceb /10.130.5.164   sync-client-synchronizer-62-t8k4m-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 19         -               35243179        -               sync-client-synchronizer-62-rwz5p-$ProductDate-36b78d4b-8ae8-4f59-aa47-18cb2ef6db36 /10.129.7.251   sync-client-synchronizer-62-rwz5p-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 15         -               35220847        -               sync-client-synchronizer-62-p7lnw-$ProductDate-292de9f2-a77f-4cab-adda-52a6d77def08 /10.129.5.194   sync-client-synchronizer-62-p7lnw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 13         -               35334642        -               sync-client-synchronizer-62-nrxn9-$ProductDate-8de8a7da-a27b-40b2-9f80-8099f7072a31 /10.129.7.249   sync-client-synchronizer-62-nrxn9-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 24         -               35502024        -               sync-client-synchronizer-62-twc5z-$ProductDate-76c9a19d-4c39-4bcd-8591-11919d9a5bdb /10.128.7.77    sync-client-synchronizer-62-twc5z-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 22         -               35303302        -               sync-client-synchronizer-62-tdjnm-$ProductDate-947a2771-07ae-4c3b-a23d-0eabae7476cb /10.129.7.252   sync-client-synchronizer-62-tdjnm-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 18         -               35268532        -               sync-client-synchronizer-62-rfpns-$ProductDate-dab925cb-573d-4fe4-add6-de393b4f8c9d /10.129.5.195   sync-client-synchronizer-62-rfpns-$ProductDate

Above is the lags observed in T1 time. And after a few minutes when I ran the same command I observe those CURRENT-OFFSETS never move forward. And the client side I can not observe any exceptions or warnings. I am using Confluent.Kafka .net client. Restarting consumers is also not helping. In my clients I just subscribe to the topics so I am not manually selecting partitions.

This is the log of the same command after a few minutes :

sg-1-ProductDate sfz.public.ProductDate 23         31564011        35279144        3715133         sync-client-synchronizer-62-tg6n2-$ProductDate-77fca4ff-7588-431f-9d52-eda5bed7b2a2 /10.128.4.239   sync-client-synchronizer-62-tg6n2-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 29         -               35272884        -               sync-client-synchronizer-62-ztnm6-$ProductDate-46cd94ee-8607-4430-92fd-5666baaf1010 /10.129.5.193   sync-client-synchronizer-62-ztnm6-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 11         -               35328433        -               sync-client-synchronizer-62-m6v65-$ProductDate-0c5cdba0-1f1f-4c27-afae-3c529ca17186 /10.128.7.75    sync-client-synchronizer-62-m6v65-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 25         -               35029662        -               sync-client-synchronizer-62-v4n7g-$ProductDate-92bad60e-0e38-4738-bb6f-850276e0ed63 /10.128.7.80    sync-client-synchronizer-62-v4n7g-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 26         -               35330594        -               sync-client-synchronizer-62-vfhhh-$ProductDate-17509428-be8c-4c03-b694-738e3e1e1fad /10.129.7.248   sync-client-synchronizer-62-vfhhh-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 4          -               35293987        -               sync-client-synchronizer-62-9n5jg-$ProductDate-e8682966-ae8c-400d-b246-05b485b4d332 /10.128.4.242   sync-client-synchronizer-62-9n5jg-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 9          31430665        35128171        3697506         sync-client-synchronizer-62-jsrfw-$ProductDate-c20d440b-a5f6-4b32-8ccf-db657ae8ec67 /10.130.5.162   sync-client-synchronizer-62-jsrfw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 10         -               35072671        -               sync-client-synchronizer-62-kz6hn-$ProductDate-37ef221c-07f6-4ffd-8a1d-ed556c0c38c5 /10.129.5.197   sync-client-synchronizer-62-kz6hn-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 16         -               35236110        -               sync-client-synchronizer-62-pxl4d-$ProductDate-126c2230-eb5a-4cf8-9d29-f7b516075f03 /10.128.7.79    sync-client-synchronizer-62-pxl4d-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 14         -               35420724        -               sync-client-synchronizer-62-p5bmj-$ProductDate-ffdef648-0b8a-40f0-b5a3-062715ee1a61 /10.129.5.196   sync-client-synchronizer-62-p5bmj-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 6          -               35236782        -               sync-client-synchronizer-62-dwz7n-$ProductDate-2d294290-583d-4ffb-ac83-e0470b7bb8c6 /10.128.7.74    sync-client-synchronizer-62-dwz7n-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 7          31450092        35155969        3705877         sync-client-synchronizer-62-g69hc-$ProductDate-1c1f8765-a01f-436e-8903-1aefb5c5f203 /10.128.4.245   sync-client-synchronizer-62-g69hc-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 17         -               35251086        -               sync-client-synchronizer-62-qfz9f-$ProductDate-73a40da6-8e80-4de4-a0ff-a4ad6904d651 /10.128.4.244   sync-client-synchronizer-62-qfz9f-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 5          -               35356782        -               sync-client-synchronizer-62-cjhts-$ProductDate-084892b8-2918-480e-9fcc-d6b1e8dcfe22 /10.128.4.240   sync-client-synchronizer-62-cjhts-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 3          32775591        35177356        2401765         sync-client-synchronizer-62-5wls7-$ProductDate-ec1f3855-ca3f-4add-b0c5-9c991dd3733e /10.128.4.243   sync-client-synchronizer-62-5wls7-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 1          -               35153865        -               sync-client-synchronizer-62-2f7ln-$ProductDate-47ab446a-356e-41b8-8224-2de3ca93eaf8 /10.128.7.76    sync-client-synchronizer-62-2f7ln-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 12         31637933        35362407        3724474         sync-client-synchronizer-62-ndfct-$ProductDate-baeb1d9b-3574-41e3-b976-7f73808bf4d3 /10.129.7.250   sync-client-synchronizer-62-ndfct-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 27         -               35249564        -               sync-client-synchronizer-62-vz2hv-$ProductDate-0c3022e2-4688-49a5-bf3c-77c0fed69a38 /10.130.5.163   sync-client-synchronizer-62-vz2hv-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 2          -               35280630        -               sync-client-synchronizer-62-5792q-$ProductDate-75d63e1e-23ec-4378-b506-2315f043307d /10.128.7.78    sync-client-synchronizer-62-5792q-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 20         -               35473631        -               sync-client-synchronizer-62-sqrbg-$ProductDate-78d31620-d4e2-4eff-880d-d79e9cffaeff /10.130.5.165   sync-client-synchronizer-62-sqrbg-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 0          -               35101687        -               sync-client-synchronizer-62-2cngk-$ProductDate-de0797dc-8592-46ca-98d5-32c0ccc0cc69 /10.130.5.161   sync-client-synchronizer-62-2cngk-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 8          -               35372522        -               sync-client-synchronizer-62-j8xnw-$ProductDate-d436268c-8b55-4045-95b0-e11a60d457f5 /10.128.7.73    sync-client-synchronizer-62-j8xnw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 28         -               35430364        -               sync-client-synchronizer-62-wkrjz-$ProductDate-6ea118b3-f2b2-402b-bd67-5e2d90e7af95 /10.128.4.241   sync-client-synchronizer-62-wkrjz-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 21         -               35341353        -               sync-client-synchronizer-62-t8k4m-$ProductDate-7363dc4b-41f7-4657-ba0d-25e766646ceb /10.130.5.164   sync-client-synchronizer-62-t8k4m-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 19         -               35245578        -               sync-client-synchronizer-62-rwz5p-$ProductDate-36b78d4b-8ae8-4f59-aa47-18cb2ef6db36 /10.129.7.251   sync-client-synchronizer-62-rwz5p-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 15         -               35223232        -               sync-client-synchronizer-62-p7lnw-$ProductDate-292de9f2-a77f-4cab-adda-52a6d77def08 /10.129.5.194   sync-client-synchronizer-62-p7lnw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 13         -               35337134        -               sync-client-synchronizer-62-nrxn9-$ProductDate-8de8a7da-a27b-40b2-9f80-8099f7072a31 /10.129.7.249   sync-client-synchronizer-62-nrxn9-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 24         -               35504549        -               sync-client-synchronizer-62-twc5z-$ProductDate-76c9a19d-4c39-4bcd-8591-11919d9a5bdb /10.128.7.77    sync-client-synchronizer-62-twc5z-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 22         -               35305746        -               sync-client-synchronizer-62-tdjnm-$ProductDate-947a2771-07ae-4c3b-a23d-0eabae7476cb /10.129.7.252   sync-client-synchronizer-62-tdjnm-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 18         -               35270985        -               sync-client-synchronizer-62-rfpns-$ProductDate-dab925cb-573d-4fe4-add6-de393b4f8c9d /10.129.5.195   sync-client-synchronizer-62-rfpns-$ProductDate

My kafka version is 2.6.0 and Confluent.Kafka package version is 1.9.3. My consumer applications are running in docker container using image : mcr.microsoft.com/dotnet/sdk:6.0-alpine.

Below is my consumer code :

private async Task SubscribeToTableChanges(string tableName)
        {
            this.logger.LogWarning("Consumer is starting");
            var rawTableName = tableName.Split('.')[^1];
            var config = new ConsumerConfig
            {
                BootstrapServers = this.settings.KafkaUrl,
                EnableSslCertificateVerification = false,
                GroupId = $"sg-1-{rawTableName}",
                AllowAutoCreateTopics = true,
                ClientId = $"sync-client-{this.hostName}-${rawTableName}",
                EnableAutoCommit = false,
                FetchWaitMaxMs = 1000,
                FetchMinBytes = 16384,
                FetchMaxBytes = 3 * 52428800,
                MessageMaxBytes = 3 * 52428800,
                MaxPartitionFetchBytes = 4 * 52428800,
                HeartbeatIntervalMs = 3000,
                SessionTimeoutMs = 20000,
                AutoOffsetReset = AutoOffsetReset.Latest,
                SocketTimeoutMs = 120000,
                PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
            };


            counters.TryAdd(rawTableName, 0);
            var partition = 0;

            var topics = tableName;
            using var consumer = new ConsumerBuilder<Ignore, string>(config)
                .SetPartitionsAssignedHandler((c, partitions) =>
                {
                    foreach (var item in partitions)
                    {
                        partition = item.Partition.Value;
                        this.logger.LogWarning("Assigned to partition : {Part} with topic : {Topic} ", item.Partition.Value, item.Topic);
                    }
                })
                .SetErrorHandler((_, e) => this.logger.LogError($"Error: {e.Reason}"))
                .SetStatisticsHandler((_, json) => this.logger.LogError($"Statistics: {json}"))
                .SetPartitionsRevokedHandler((c, partitions) =>
                {
                    var remaining = c.Assignment.Where(atp => partitions.Where(rtp => rtp.TopicPartition == atp).Count() == 0);
                    this.logger.LogError(
                        "Partitions incrementally revoked: [" +
                        string.Join(',', partitions.Select(p => p.Partition.Value)) +
                        "], remaining: [" +
                        string.Join(',', remaining.Select(p => p.Partition.Value)) +
                        "]");
                })
                .SetPartitionsLostHandler((c, partitions) =>
                {
                    this.logger.LogError($"Partitions were lost: [{string.Join(", ", partitions)}]");
                })
                .Build();
            var skipThese = this.settings.Sync.SkipThese;

           
            this.logger.LogInformation("Subscribing to the topics : {topics}", topics);
            consumer.Subscribe(topics);
            this.logger.LogInformation("Subscribed to the topics : {topics}", topics);
     

            while (!this.cancellation.IsCancellationRequested)
            {
                var offsets = new Queue<TopicPartitionOffset>();
                try
                {
                    var data = consumer.Consume(TimeSpan.FromSeconds(3));
                    if (data == null)
                    {
                        await Task.Delay(100);
                        continue;
                    }

                    if (data.IsPartitionEOF)
                    {
                        await Task.Delay(100);
                        continue;
                    }

                    unchecked
                    {
                        Increment(rawTableName);
                    }

                    if (string.IsNullOrWhiteSpace(data.Message.Value) || skipThese.Contains(tableName))
                    {
                        this.logger.LogInformation("Empty message received from topic {topic}", data.Topic);
                        offsets.Enqueue(data.TopicPartitionOffset);
                        continue;
                    }

                    if (data.TopicPartitionOffset.Offset.Value % 1000 == 0)
                    {
                        this.logger.LogInformation("{topic}[{partition}] offset is : {offset}", rawTableName, partition, data.TopicPartitionOffset.Offset.Value);
                    }

                    using var doc = JsonDocument.Parse(data.Message.Value);
                    var jobject = doc.RootElement;
                    var source = data.Topic.Split(".").Last();
                    var op = jobject.GetProperty("op").GetString();
                    var before = jobject.GetProperty("before");
                    var after = jobject.GetProperty("after");
                    var timeStamp = jobject.GetProperty("source").GetProperty("ts_ms").GetInt64();

                    if (op == "u")
                    {
                        if (before.ValueKind == JsonValueKind.Null)
                        {
                            offsets.Enqueue(data.TopicPartitionOffset);

                            continue;
                        }

                        await UpdateHandler(before, after, source, timeStamp);
                    }

                    offsets.Enqueue(data.TopicPartitionOffset);

                   
                    if (offsets.Count >= this.settings.OffsetLimit || t his.cancellation.IsCancellationRequested) 
                    {
                        var start = DateTime.Now;
                        consumer.Commit(offsets);
                        if (tableName == nameof(ProductToMarketPlaceDate))
                        {
                            this.logger.LogWarning("Commited {Offset} offsets in {Ms}ms", offsets.Count, (DateTime.Now - start).TotalMilliseconds);
                        }
                        offsets.Clear();
                    }

                }
                catch (Exception ex)
                {
                    this.logger.LogCritical("An error occured while consuming : {ex} ", ex);
                    await Task.Delay(100, this.cancellation.Token);
                }
            }
        }

In the await UpdateHandler(before, after, source, timeStamp); handler I basically reshape the data and insert it to Redis. This configuration and code work for months without any lag. But once a lag occurs that partitions offsets never move forward.

The solutions that I don't like and the working ones are so far :

I would be happy to be advised about how to diagnose the issue. Any client-side or server-side suggestions would be much appreciated.

Upvotes: 1

Views: 351

Answers (0)

Related Questions