AmsterdamLuis
AmsterdamLuis

Reputation: 425

Kafka producer using multiple threads in Ktor

I have a Ktor application that send events using RSocket.

These events need to also be sent to Kafka now, so I created the following code:

    suspend fun publishEvents(flow: SharedFlow<Pair<String, String>>) {
        val producer = KafkaProducer<String, String>(mapOf())
        flow.collect { 
            producer.send(ProducerRecord("topic", it.first, it.second))
        }
    }

The problem is that the producer is not being able to keep up with more than 5k events per second, 250 bytes each, one partition, default settings. Increasing the number of partitions did not help, nor did tweaking properties like compression, linger, batch size and acks. It seems I reached some kind of hard limit.

Modifying the code like this:

suspend fun publishEvents(flow: SharedFlow<Pair<String, String>>) {
        val producer = KafkaProducer<String, String>(mapOf())
        val executorService = Executors.newFixedThreadPool(threads)
        flow.collect { 
            executorService.submit {
                producer.send(ProducerRecord("topic", it.first, it.second))
            }
        }
    }

Fixed the issue, with two threads almost 10k per second was sent, but I am concerned about losing ordering guarantees for the same key, because of the multiple threads. How can I change this to avoid this issue? And is there a producer setting to change the number of threads?

EDIT:

Turns out this solution does not work anyway, it introduces a memory leak. And the bottleneck does not seem to be in the producer, since tweaking the configuration makes no difference and no exception is thrown. This is probably related to coroutines / shared flow. How can I fix the memory leak? Multiple threads was the only thing able to improve throughput.

Upvotes: 0

Views: 643

Answers (1)

nomisRev
nomisRev

Reputation: 2136

The Java KafkaProducer send method is already sending the ProducerRecord in an async way. It does so by keeping a buffer, or batch, and only sending the batch.size every linger.ms.

So to increase throughput of the KafkaProducer you can increase this batch.size to allow sending more records at once. When doing so you also need to increase the linger.ms property which defaults to 0.

If linger.ms is set to 0 some batching will still occur, but very suboptimal.

Alongside these two configuration properties you might also want to consider increasing max.in.flight.requests.per.connection to allow more than the default 5 connections at the same time.

Upvotes: 1

Related Questions