rvit34
rvit34

Reputation: 2116

High memory consumption on kafka consumer

Recently I found that kafka consumers require a lot of ram.

For tests I've just started locally a single-threaded consumer that listens a single topic.Topic has 4 partitions. Kafka has only one broker.

From producer I sent only 10 small messages (it was around 11:44:30 PM, see at the image I attached at the link). Since then nobody has sent any more messages to this topic. From then I've been seeing on the diagram with constantly growing memory consumption during the consumer polling work. Line is growing untill GC is not called. Consumer just sends poll-requests and returns nothing but require a lot of memory.

I think it's problem. I tried to do some tuning, i.e. configuring some params as FETCH_MAX_BYTES_CONFIG/MAX_PARTITION_FETCH_BYTES_CONFIG/MAX_POLL_RECORDS_CONFIG but nothing actually worked out.

SSCCE:

KafkaConsumerConfig:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }

    private ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

    private Map<String, Object> consumerProperties() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "local-test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        //props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1000000); //1mb
        //props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        //props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 256000); //256kb
        return props;
    }

}

KafkaDataListener

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class KafkaDataListener {

    @KafkaListener(topics = "local-test-topic", containerFactory = "kafkaListenerContainerFactory")
    public void consumeEvent(byte[] eventData, final Acknowledgment ack) {
         try {
             System.out.println("consumer received message:" + Arrays.toString(eventData));
         }finally {
             ack.acknowledge();
         }
    }
}

Main App

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

build.gradle:

group 'org.test.kafka'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}

dependencies {

    compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '1.5.4.RELEASE'
    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.2.2.RELEASE'
}

output:

2018-02-02 23:38:51.506  INFO 14888 --- [           main] org.kafka.tests.App                      : Starting App on so-workstation with PID 14888 (/home/.../projects/custom/kafka-consumer-test/out/production/classes started by ... in /home/.../projects/custom/kafka-consumer-test)
2018-02-02 23:38:51.508  INFO 14888 --- [           main] org.kafka.tests.App                      : No active profile set, falling back to default profiles: default
2018-02-02 23:38:51.591  INFO 14888 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@281e3708: startup date [Fri Feb 02 23:38:51 MSK 2018]; root of context hierarchy
2018-02-02 23:38:52.028  INFO 14888 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2d249aa5] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-02-02 23:38:52.205  INFO 14888 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-02-02 23:38:52.207  INFO 14888 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-02-02 23:38:52.221  INFO 14888 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = local-test-consumer-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-02 23:38:52.225  INFO 14888 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = consumer-1
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = local-test-consumer-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-02 23:38:52.258  INFO 14888 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.2.0
2018-02-02 23:38:52.258  INFO 14888 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 576d93a8dc0cf421
2018-02-02 23:38:52.268  INFO 14888 --- [           main] org.kafka.tests.App                      : Started App in 1.056 seconds (JVM running for 1.337)
2018-02-02 23:38:52.308  INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka:9092 (id: 2147483646 rack: null) for group local-test-consumer-group.
2018-02-02 23:38:52.312  INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group local-test-consumer-group
2018-02-02 23:38:52.313  INFO 14888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2018-02-02 23:38:52.313  INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group local-test-consumer-group
2018-02-02 23:38:52.319  INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group local-test-consumer-group with generation 1
2018-02-02 23:38:52.320  INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [local-test-topic-3, local-test-topic-2, local-test-topic-1, local-test-topic-0] for group local-test-consumer-group
2018-02-02 23:38:52.328  INFO 14888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[local-test-topic-3, local-test-topic-2, local-test-topic-1, local-test-topic-0]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]

Does anybody know how to tune it properly? Or maybe on some higher version memory usage is optimized?

Kafka server: 0.10.2.0 Kafka client: 0.10.2.0

See the images:

kafka 0.10.2 memory consumption diagram kafka 0.10.2 memory allocation per thread diagram

UPD:

For kafka consumer 1.0.0 (spring kafka 2.1.2) memory usage diagram looks a bit better. Now the consumption line is growing not so fast as before. But now RMI TCP Connection thread consumes even more memory that kafka consumer thread.

kafka 1.0.0 memory consumption diagram with default settings kafka 1.0.0 memory allocation per thread diagram with default settings

Moreover it seems that consumer's params are getting affects on memory usage. With consumer's params FETCH_MAX_BYTES_CONFIG = 1mb and MAX_PARTITION_FETCH_BYTES_CONFIG = 256kb consumption gets lower.

kafka 1.0.0 memory consumption diagram with custom settings

Upvotes: 8

Views: 14584

Answers (1)

Marcel
Marcel

Reputation: 1234

One cause of this "sawtooth pattern" is the application Java VisualVM itself. It is asking your JVM every second for information. The JVM then creates a lot of object for this process, which gets obsolete after sending to VisualVM and can therefore easily garbage collected.

Try to decrease the polling rate in the settings of VisualVM. It should minimize the effect.

Upvotes: 4

Related Questions