Reputation: 10790
I have a kafka cluster with 3 brokers and 3 zoo-keepers and using strimzi template for openshift 3.11. And I have topics with 20 or 30 partitions. Well I can't diagnose why it is happening exactly why but after some mysterious event some of the partitions start to have a lag. Having a lag is normal thing when you can't process the data as fast as it is produced. So I stop to producers to observe what is happening. And that is the weird part.
> sg-1-ProductDate sfz.public.ProductDate 23 31564011 35276642 3712631 sync-client-synchronizer-62-tg6n2-$ProductDate-77fca4ff-7588-431f-9d52-eda5bed7b2a2 /10.128.4.239 sync-client-synchronizer-62-tg6n2-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 29 - 35270447 - sync-client-synchronizer-62-ztnm6-$ProductDate-46cd94ee-8607-4430-92fd-5666baaf1010 /10.129.5.193 sync-client-synchronizer-62-ztnm6-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 11 - 35326076 - sync-client-synchronizer-62-m6v65-$ProductDate-0c5cdba0-1f1f-4c27-afae-3c529ca17186 /10.128.7.75 sync-client-synchronizer-62-m6v65-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 25 - 35027302 - sync-client-synchronizer-62-v4n7g-$ProductDate-92bad60e-0e38-4738-bb6f-850276e0ed63 /10.128.7.80 sync-client-synchronizer-62-v4n7g-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 26 - 35328140 - sync-client-synchronizer-62-vfhhh-$ProductDate-17509428-be8c-4c03-b694-738e3e1e1fad /10.129.7.248 sync-client-synchronizer-62-vfhhh-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 4 - 35291468 - sync-client-synchronizer-62-9n5jg-$ProductDate-e8682966-ae8c-400d-b246-05b485b4d332 /10.128.4.242 sync-client-synchronizer-62-9n5jg-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 9 31430665 35125759 3695094 sync-client-synchronizer-62-jsrfw-$ProductDate-c20d440b-a5f6-4b32-8ccf-db657ae8ec67 /10.130.5.162 sync-client-synchronizer-62-jsrfw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 10 - 35070277 - sync-client-synchronizer-62-kz6hn-$ProductDate-37ef221c-07f6-4ffd-8a1d-ed556c0c38c5 /10.129.5.197 sync-client-synchronizer-62-kz6hn-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 16 - 35233649 - sync-client-synchronizer-62-pxl4d-$ProductDate-126c2230-eb5a-4cf8-9d29-f7b516075f03 /10.128.7.79 sync-client-synchronizer-62-pxl4d-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 14 - 35418276 - sync-client-synchronizer-62-p5bmj-$ProductDate-ffdef648-0b8a-40f0-b5a3-062715ee1a61 /10.129.5.196 sync-client-synchronizer-62-p5bmj-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 6 - 35234349 - sync-client-synchronizer-62-dwz7n-$ProductDate-2d294290-583d-4ffb-ac83-e0470b7bb8c6 /10.128.7.74 sync-client-synchronizer-62-dwz7n-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 7 31450092 35153556 3703464 sync-client-synchronizer-62-g69hc-$ProductDate-1c1f8765-a01f-436e-8903-1aefb5c5f203 /10.128.4.245 sync-client-synchronizer-62-g69hc-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 17 - 35248649 - sync-client-synchronizer-62-qfz9f-$ProductDate-73a40da6-8e80-4de4-a0ff-a4ad6904d651 /10.128.4.244 sync-client-synchronizer-62-qfz9f-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 5 - 35354481 - sync-client-synchronizer-62-cjhts-$ProductDate-084892b8-2918-480e-9fcc-d6b1e8dcfe22 /10.128.4.240 sync-client-synchronizer-62-cjhts-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 3 32775591 35174921 2399330 sync-client-synchronizer-62-5wls7-$ProductDate-ec1f3855-ca3f-4add-b0c5-9c991dd3733e /10.128.4.243 sync-client-synchronizer-62-5wls7-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 1 - 35151480 - sync-client-synchronizer-62-2f7ln-$ProductDate-47ab446a-356e-41b8-8224-2de3ca93eaf8 /10.128.7.76 sync-client-synchronizer-62-2f7ln-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 12 31637933 35359850 3721917 sync-client-synchronizer-62-ndfct-$ProductDate-baeb1d9b-3574-41e3-b976-7f73808bf4d3 /10.129.7.250 sync-client-synchronizer-62-ndfct-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 27 - 35247067 - sync-client-synchronizer-62-vz2hv-$ProductDate-0c3022e2-4688-49a5-bf3c-77c0fed69a38 /10.130.5.163 sync-client-synchronizer-62-vz2hv-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 2 - 35278219 - sync-client-synchronizer-62-5792q-$ProductDate-75d63e1e-23ec-4378-b506-2315f043307d /10.128.7.78 sync-client-synchronizer-62-5792q-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 20 - 35471162 - sync-client-synchronizer-62-sqrbg-$ProductDate-78d31620-d4e2-4eff-880d-d79e9cffaeff /10.130.5.165 sync-client-synchronizer-62-sqrbg-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 0 - 35099257 - sync-client-synchronizer-62-2cngk-$ProductDate-de0797dc-8592-46ca-98d5-32c0ccc0cc69 /10.130.5.161 sync-client-synchronizer-62-2cngk-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 8 - 35370008 - sync-client-synchronizer-62-j8xnw-$ProductDate-d436268c-8b55-4045-95b0-e11a60d457f5 /10.128.7.73 sync-client-synchronizer-62-j8xnw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 28 - 35427999 - sync-client-synchronizer-62-wkrjz-$ProductDate-6ea118b3-f2b2-402b-bd67-5e2d90e7af95 /10.128.4.241 sync-client-synchronizer-62-wkrjz-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 21 - 35338947 - sync-client-synchronizer-62-t8k4m-$ProductDate-7363dc4b-41f7-4657-ba0d-25e766646ceb /10.130.5.164 sync-client-synchronizer-62-t8k4m-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 19 - 35243179 - sync-client-synchronizer-62-rwz5p-$ProductDate-36b78d4b-8ae8-4f59-aa47-18cb2ef6db36 /10.129.7.251 sync-client-synchronizer-62-rwz5p-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 15 - 35220847 - sync-client-synchronizer-62-p7lnw-$ProductDate-292de9f2-a77f-4cab-adda-52a6d77def08 /10.129.5.194 sync-client-synchronizer-62-p7lnw-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 13 - 35334642 - sync-client-synchronizer-62-nrxn9-$ProductDate-8de8a7da-a27b-40b2-9f80-8099f7072a31 /10.129.7.249 sync-client-synchronizer-62-nrxn9-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 24 - 35502024 - sync-client-synchronizer-62-twc5z-$ProductDate-76c9a19d-4c39-4bcd-8591-11919d9a5bdb /10.128.7.77 sync-client-synchronizer-62-twc5z-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 22 - 35303302 - sync-client-synchronizer-62-tdjnm-$ProductDate-947a2771-07ae-4c3b-a23d-0eabae7476cb /10.129.7.252 sync-client-synchronizer-62-tdjnm-$ProductDate
> sg-1-ProductDate sfz.public.ProductDate 18 - 35268532 - sync-client-synchronizer-62-rfpns-$ProductDate-dab925cb-573d-4fe4-add6-de393b4f8c9d /10.129.5.195 sync-client-synchronizer-62-rfpns-$ProductDate
Above is the lags observed in T1 time. And after a few minutes when I ran the same command I observe those CURRENT-OFFSETS never move forward. And the client side I can not observe any exceptions or warnings. I am using Confluent.Kafka
.net client. Restarting consumers is also not helping. In my clients I just subscribe to the topics so I am not manually selecting partitions.
This is the log of the same command after a few minutes :
sg-1-ProductDate sfz.public.ProductDate 23 31564011 35279144 3715133 sync-client-synchronizer-62-tg6n2-$ProductDate-77fca4ff-7588-431f-9d52-eda5bed7b2a2 /10.128.4.239 sync-client-synchronizer-62-tg6n2-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 29 - 35272884 - sync-client-synchronizer-62-ztnm6-$ProductDate-46cd94ee-8607-4430-92fd-5666baaf1010 /10.129.5.193 sync-client-synchronizer-62-ztnm6-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 11 - 35328433 - sync-client-synchronizer-62-m6v65-$ProductDate-0c5cdba0-1f1f-4c27-afae-3c529ca17186 /10.128.7.75 sync-client-synchronizer-62-m6v65-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 25 - 35029662 - sync-client-synchronizer-62-v4n7g-$ProductDate-92bad60e-0e38-4738-bb6f-850276e0ed63 /10.128.7.80 sync-client-synchronizer-62-v4n7g-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 26 - 35330594 - sync-client-synchronizer-62-vfhhh-$ProductDate-17509428-be8c-4c03-b694-738e3e1e1fad /10.129.7.248 sync-client-synchronizer-62-vfhhh-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 4 - 35293987 - sync-client-synchronizer-62-9n5jg-$ProductDate-e8682966-ae8c-400d-b246-05b485b4d332 /10.128.4.242 sync-client-synchronizer-62-9n5jg-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 9 31430665 35128171 3697506 sync-client-synchronizer-62-jsrfw-$ProductDate-c20d440b-a5f6-4b32-8ccf-db657ae8ec67 /10.130.5.162 sync-client-synchronizer-62-jsrfw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 10 - 35072671 - sync-client-synchronizer-62-kz6hn-$ProductDate-37ef221c-07f6-4ffd-8a1d-ed556c0c38c5 /10.129.5.197 sync-client-synchronizer-62-kz6hn-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 16 - 35236110 - sync-client-synchronizer-62-pxl4d-$ProductDate-126c2230-eb5a-4cf8-9d29-f7b516075f03 /10.128.7.79 sync-client-synchronizer-62-pxl4d-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 14 - 35420724 - sync-client-synchronizer-62-p5bmj-$ProductDate-ffdef648-0b8a-40f0-b5a3-062715ee1a61 /10.129.5.196 sync-client-synchronizer-62-p5bmj-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 6 - 35236782 - sync-client-synchronizer-62-dwz7n-$ProductDate-2d294290-583d-4ffb-ac83-e0470b7bb8c6 /10.128.7.74 sync-client-synchronizer-62-dwz7n-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 7 31450092 35155969 3705877 sync-client-synchronizer-62-g69hc-$ProductDate-1c1f8765-a01f-436e-8903-1aefb5c5f203 /10.128.4.245 sync-client-synchronizer-62-g69hc-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 17 - 35251086 - sync-client-synchronizer-62-qfz9f-$ProductDate-73a40da6-8e80-4de4-a0ff-a4ad6904d651 /10.128.4.244 sync-client-synchronizer-62-qfz9f-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 5 - 35356782 - sync-client-synchronizer-62-cjhts-$ProductDate-084892b8-2918-480e-9fcc-d6b1e8dcfe22 /10.128.4.240 sync-client-synchronizer-62-cjhts-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 3 32775591 35177356 2401765 sync-client-synchronizer-62-5wls7-$ProductDate-ec1f3855-ca3f-4add-b0c5-9c991dd3733e /10.128.4.243 sync-client-synchronizer-62-5wls7-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 1 - 35153865 - sync-client-synchronizer-62-2f7ln-$ProductDate-47ab446a-356e-41b8-8224-2de3ca93eaf8 /10.128.7.76 sync-client-synchronizer-62-2f7ln-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 12 31637933 35362407 3724474 sync-client-synchronizer-62-ndfct-$ProductDate-baeb1d9b-3574-41e3-b976-7f73808bf4d3 /10.129.7.250 sync-client-synchronizer-62-ndfct-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 27 - 35249564 - sync-client-synchronizer-62-vz2hv-$ProductDate-0c3022e2-4688-49a5-bf3c-77c0fed69a38 /10.130.5.163 sync-client-synchronizer-62-vz2hv-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 2 - 35280630 - sync-client-synchronizer-62-5792q-$ProductDate-75d63e1e-23ec-4378-b506-2315f043307d /10.128.7.78 sync-client-synchronizer-62-5792q-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 20 - 35473631 - sync-client-synchronizer-62-sqrbg-$ProductDate-78d31620-d4e2-4eff-880d-d79e9cffaeff /10.130.5.165 sync-client-synchronizer-62-sqrbg-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 0 - 35101687 - sync-client-synchronizer-62-2cngk-$ProductDate-de0797dc-8592-46ca-98d5-32c0ccc0cc69 /10.130.5.161 sync-client-synchronizer-62-2cngk-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 8 - 35372522 - sync-client-synchronizer-62-j8xnw-$ProductDate-d436268c-8b55-4045-95b0-e11a60d457f5 /10.128.7.73 sync-client-synchronizer-62-j8xnw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 28 - 35430364 - sync-client-synchronizer-62-wkrjz-$ProductDate-6ea118b3-f2b2-402b-bd67-5e2d90e7af95 /10.128.4.241 sync-client-synchronizer-62-wkrjz-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 21 - 35341353 - sync-client-synchronizer-62-t8k4m-$ProductDate-7363dc4b-41f7-4657-ba0d-25e766646ceb /10.130.5.164 sync-client-synchronizer-62-t8k4m-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 19 - 35245578 - sync-client-synchronizer-62-rwz5p-$ProductDate-36b78d4b-8ae8-4f59-aa47-18cb2ef6db36 /10.129.7.251 sync-client-synchronizer-62-rwz5p-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 15 - 35223232 - sync-client-synchronizer-62-p7lnw-$ProductDate-292de9f2-a77f-4cab-adda-52a6d77def08 /10.129.5.194 sync-client-synchronizer-62-p7lnw-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 13 - 35337134 - sync-client-synchronizer-62-nrxn9-$ProductDate-8de8a7da-a27b-40b2-9f80-8099f7072a31 /10.129.7.249 sync-client-synchronizer-62-nrxn9-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 24 - 35504549 - sync-client-synchronizer-62-twc5z-$ProductDate-76c9a19d-4c39-4bcd-8591-11919d9a5bdb /10.128.7.77 sync-client-synchronizer-62-twc5z-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 22 - 35305746 - sync-client-synchronizer-62-tdjnm-$ProductDate-947a2771-07ae-4c3b-a23d-0eabae7476cb /10.129.7.252 sync-client-synchronizer-62-tdjnm-$ProductDate
sg-1-ProductDate sfz.public.ProductDate 18 - 35270985 - sync-client-synchronizer-62-rfpns-$ProductDate-dab925cb-573d-4fe4-add6-de393b4f8c9d /10.129.5.195 sync-client-synchronizer-62-rfpns-$ProductDate
My kafka version is 2.6.0 and Confluent.Kafka package version is 1.9.3. My consumer applications are running in docker container using image : mcr.microsoft.com/dotnet/sdk:6.0-alpine.
Below is my consumer code :
private async Task SubscribeToTableChanges(string tableName)
{
this.logger.LogWarning("Consumer is starting");
var rawTableName = tableName.Split('.')[^1];
var config = new ConsumerConfig
{
BootstrapServers = this.settings.KafkaUrl,
EnableSslCertificateVerification = false,
GroupId = $"sg-1-{rawTableName}",
AllowAutoCreateTopics = true,
ClientId = $"sync-client-{this.hostName}-${rawTableName}",
EnableAutoCommit = false,
FetchWaitMaxMs = 1000,
FetchMinBytes = 16384,
FetchMaxBytes = 3 * 52428800,
MessageMaxBytes = 3 * 52428800,
MaxPartitionFetchBytes = 4 * 52428800,
HeartbeatIntervalMs = 3000,
SessionTimeoutMs = 20000,
AutoOffsetReset = AutoOffsetReset.Latest,
SocketTimeoutMs = 120000,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
};
counters.TryAdd(rawTableName, 0);
var partition = 0;
var topics = tableName;
using var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetPartitionsAssignedHandler((c, partitions) =>
{
foreach (var item in partitions)
{
partition = item.Partition.Value;
this.logger.LogWarning("Assigned to partition : {Part} with topic : {Topic} ", item.Partition.Value, item.Topic);
}
})
.SetErrorHandler((_, e) => this.logger.LogError($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => this.logger.LogError($"Statistics: {json}"))
.SetPartitionsRevokedHandler((c, partitions) =>
{
var remaining = c.Assignment.Where(atp => partitions.Where(rtp => rtp.TopicPartition == atp).Count() == 0);
this.logger.LogError(
"Partitions incrementally revoked: [" +
string.Join(',', partitions.Select(p => p.Partition.Value)) +
"], remaining: [" +
string.Join(',', remaining.Select(p => p.Partition.Value)) +
"]");
})
.SetPartitionsLostHandler((c, partitions) =>
{
this.logger.LogError($"Partitions were lost: [{string.Join(", ", partitions)}]");
})
.Build();
var skipThese = this.settings.Sync.SkipThese;
this.logger.LogInformation("Subscribing to the topics : {topics}", topics);
consumer.Subscribe(topics);
this.logger.LogInformation("Subscribed to the topics : {topics}", topics);
while (!this.cancellation.IsCancellationRequested)
{
var offsets = new Queue<TopicPartitionOffset>();
try
{
var data = consumer.Consume(TimeSpan.FromSeconds(3));
if (data == null)
{
await Task.Delay(100);
continue;
}
if (data.IsPartitionEOF)
{
await Task.Delay(100);
continue;
}
unchecked
{
Increment(rawTableName);
}
if (string.IsNullOrWhiteSpace(data.Message.Value) || skipThese.Contains(tableName))
{
this.logger.LogInformation("Empty message received from topic {topic}", data.Topic);
offsets.Enqueue(data.TopicPartitionOffset);
continue;
}
if (data.TopicPartitionOffset.Offset.Value % 1000 == 0)
{
this.logger.LogInformation("{topic}[{partition}] offset is : {offset}", rawTableName, partition, data.TopicPartitionOffset.Offset.Value);
}
using var doc = JsonDocument.Parse(data.Message.Value);
var jobject = doc.RootElement;
var source = data.Topic.Split(".").Last();
var op = jobject.GetProperty("op").GetString();
var before = jobject.GetProperty("before");
var after = jobject.GetProperty("after");
var timeStamp = jobject.GetProperty("source").GetProperty("ts_ms").GetInt64();
if (op == "u")
{
if (before.ValueKind == JsonValueKind.Null)
{
offsets.Enqueue(data.TopicPartitionOffset);
continue;
}
await UpdateHandler(before, after, source, timeStamp);
}
offsets.Enqueue(data.TopicPartitionOffset);
if (offsets.Count >= this.settings.OffsetLimit || t his.cancellation.IsCancellationRequested)
{
var start = DateTime.Now;
consumer.Commit(offsets);
if (tableName == nameof(ProductToMarketPlaceDate))
{
this.logger.LogWarning("Commited {Offset} offsets in {Ms}ms", offsets.Count, (DateTime.Now - start).TotalMilliseconds);
}
offsets.Clear();
}
}
catch (Exception ex)
{
this.logger.LogCritical("An error occured while consuming : {ex} ", ex);
await Task.Delay(100, this.cancellation.Token);
}
}
}
In the await UpdateHandler(before, after, source, timeStamp);
handler I basically reshape the data and insert it to Redis. This configuration and code work for months without any lag. But once a lag occurs that partitions offsets never move forward.
The solutions that I don't like and the working ones are so far :
I would be happy to be advised about how to diagnose the issue. Any client-side or server-side suggestions would be much appreciated.
Upvotes: 1
Views: 351