Adam Gwóźdź
Adam Gwóźdź

Reputation: 1

Single partition consumer speed and throughput

I am developing a system which imports a huge amount of event data from a database to Apache Kafka and then my Java application processes this data and then puts messages back to a Kafka topic.

I am using Debezium and Kafka Connect for importing data to Kafka. Then data from Debezium is taken by my consumer application. It is mandatory for this consumer to read from a single partition because I need the ordering guarantee from the database. The problem is that a single consumer can't keep up with the Debezium producer, so my messages are more and more delayed. Is it possible to somehow improve speed of this consumer? What is the most important configurations which improve speed and throughput of my consumer?

My Debezium messages do not include schema information so they aren't big. My consumer is implemented using the Kafka-Streams library with the following configurations:

Properties properties = new Properties();

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

properties.put(StreamsConfig.POLL_MS_CONFIG,50);

properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);

My topology for this consumer is the following:

public Topology createTopology(String debeziumTopic, String otherTopic) {
    
    JsonDebeziumSerde jsonDebeziumSerde = new JsonDebeziumSerde();

    
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    
    streamsBuilder.stream(debeziumTopic, Consumed.with(Serdes.String(), jsonDebeziumSerde))
                  
        .foreach((k, v) -> {
                      
            try {
                          
                String id = v.get("ID").textValue();
                          
                kafkaTemplate.send(otherTopic, id, v);
                                     
            } catch (NullPointerException ex) {
                          
                log.warn(debeziumTopic + " has empty message");
   
            }
                 
     });

    
    Topology topology = streamsBuilder.build();

    
   
    return topology;

}

My broker configurations:

auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
zookeeper.connection.timeout.ms=1000
log.retention.hours=1
num.partitions=10
delete.topic.enable=true

Upvotes: 0

Views: 1048

Answers (1)

Karsten Schnitter
Karsten Schnitter

Reputation: 301

It is hard to give general advice on performance, but I will try to share some of my experience. I am running a Kafka Streams application, that can easily reach a throughput of over 100k messages per second on a single thread. Not sure, how this compares to your use-case. To get there, I was inspecting the application with VisualVM and its sampler. But any profiler would do. This will show you, where the bottlenecks in your application are.

For me it was JSON serialization and deserialization, which I improved by switching to protocol buffers. Not sure, if that is an option in your case. Maybe avro is a better option for you.

Another major improvement was using zstd compression on the topics, which greatly reduced the data volume and network load. This also sped up Kafka streams considerably.

Finally, I am wondering, why you use the KafkaTemplate to write to the output topic. I would have expected a dsl expression like this:

streamsBuilder.stream(debeziumTopic, Consumed.with(Serdes.String(), jsonDebeziumSerde))
 
  .selectKey((k,v) -> {
    var id = v.get("ID");
    if (id != null) {
      return id.textValue();
    }
    log.warn(debeziumTopic + " has empty message");

    return null;
  })
  .filter((k,v) -> k != null)
  .to(otherTopic, Produced.with(Serdes.String(), jsonDebeziumSerde));

I am not sure about the performance implications of that approach, but it is more idiomatic for Kafka Streams, so it might be more efficient.

Upvotes: 1

Related Questions