Nataraj Vedula
Nataraj Vedula

Reputation: 1

Kafka Streams in .Net - Implementing IConsumerRebalanceListener

We have used Streamiz.Kafka.Net .Net package to create our Kafka Stream Application which read a source topic, transform the data and write the enriched data into another topic.

We want to make sure our stream is not duplicating by processing the same message from source topic. We see this behavior mainly when we stop and start the stream. Use case is like server restart activity for scheduled maintenance.

We want to know how to apply the deduplication implemented for kafka streams in .Net. Or if any sample code sharing on how to implement IConsumerRebalanceListener in Kafka Stream and implement the PartitionsRevoked.

We have tried currently setting our Stream to avoid duplicates as long as they are running continuously by setting below properties

var streamConfig = new StreamConfig()
{
ApplicationId = appSettings.GetValue
("KafkaStreamAPIConfig:StreamConfigAppId"),
BootstrapServers = appSettings.GetValue
("KafkaStreamAPIConfig:BootstrapServers"),
DefaultKeySerDes = new StringSerDes(),
DefaultValueSerDes = new StringSerDes(),
ParallelProcessing = true,
MaxPollRecords = 1,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,

NumStreamThreads = 60,
CommitIntervalMs = 100,
LogThreadName = true,
ClientId = appSettings.GetValue("KafkaStreamAPIConfig:ClientId") ,
Acks = Confluent.Kafka.Acks.All,
PartitionAssignmentStrategy = Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin,
EnableIdempotence = true,
HeartbeatIntervalMs = 100000,

SessionTimeoutMs = 300000,
MaxPollIntervalMs = 300000
};

StreamBuilder builder = new StreamBuilder();

IKStream<string, string> ks= builder.Stream<string, string>(appSettings.GetValue("KafkaStreamAPIConfig:SourceTopicName"));

ks.MapValues((k,v) => RequestTransformation(k,v,this).Result)
.Filter((k, v) => v != String.Empty)
.To(appSettings.GetValue("KafkaStreamAPIConfig:DestNotifyTopic"), new StringSerDes(), new StringSerDes());
Topology t = builder.Build();

//Instantiate and Start Kafka Stream
Streamiz.Kafka.Net.KafkaStream stream = new Streamiz.Kafka.Net.KafkaStream(t, streamConfig);

await stream.StartAsync();

However we want to handle scenario to commit offset when rebalance occurs when stream is stopped explicitly

Upvotes: 0

Views: 185

Answers (0)

Related Questions