James
James

Reputation: 541

Long Running Process times out in Kafkaflow

I have a long running process and Kafka is timing out assuming that the consumer is no longer there and the message is placed back on the topic. I need a way to process my long running message but also continue to acknowledge that the consumer is still there so it does not give up on the message and reassign it.

Here is what I have tried

    var consumer = _consumerAccessor[context.ConsumerContext.ConsumerName];
    var partitionId = context.ConsumerContext.Partition;
    var partition = new Partition(partitionId);
    var topicPartition = new TopicPartition(context.ConsumerContext.Topic, partition);
    var topicPartitionList = new List<TopicPartition>()
    {
        topicPartition
    };
    //consumer.Pause();

    consumer.Pause(topicPartitionList);
    var pausedPartitions = consumer.PausedPartitions;

    var processTask =  Task.Run(() => _processFileService.ProcessFile(fileReceivedMsg));

    while (!processTask.IsCompleted)
    {
        consumer.Resume(topicPartitionList);
        //_logger.LogInformation("{FileName} still processing", fileReceivedMsg.FileName);
        Thread.Sleep(10000);
        consumer.Pause(topicPartitionList);
    }
    consumer.Resume(topicPartitionList);

Has anyone ever done this using KafkaFlow?

Upvotes: 0

Views: 436

Answers (2)

Filipe Ferreira Esch
Filipe Ferreira Esch

Reputation: 21

It is a good idea to use the WorkerStopped cancellation token of the IMessageContext.ConsumerContext for a long-running process. This cancellation token is canceled after 30 seconds after a worker is requested to stop. Using it will prevent your application from being non-responsive in deploys or in partition rebalances. This "30 seconds" window can be configured in the consumer's setup using the method WithWorkerStopTimeout()

Upvotes: 2

Joel
Joel

Reputation: 61

The issue should be related to the max.poll.interval.ms. By default the time is 5 minutes, meaning that if the consumer does not call poll after that time the consumer is considered failed and a rebalance will occur.

Consider tweaking this value in order to give time for your messages to process (the max time for this setting is 24h). This way you can also avoid having to pause and resume the consumer. Additionally, you can use the cancelation token context.ConsumerContext.WorkerStopped to stop the process in case a rebalance occurs.

In order to configure the max.poll.interval.ms you can do something like this:

services.AddKafka(
    kafka => kafka
        .UseConsoleLog()
        .AddCluster(
            cluster => cluster
                .WithBrokers(new[] { host })
                .CreateTopicIfNotExists(topicName)
                .AddConsumer(
                    consumer => consumer
                        .Topic(topicName)
                        .WithConsumerConfig(new ConsumerConfig { MaxPollIntervalMs = 1800000 }) // 30 min
                        .AddMiddlewares(
                            middlewares => middlewares
                                .AddSerializer<ProtobufNetSerializer>()
                                .AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
                        )
                )
        )
);

Upvotes: 2

Related Questions