Reputation: 2116
Recently I found that kafka consumers require a lot of ram.
For tests I've just started locally a single-threaded consumer that listens a single topic.Topic has 4 partitions. Kafka has only one broker.
From producer I sent only 10 small messages (it was around 11:44:30 PM, see at the image I attached at the link). Since then nobody has sent any more messages to this topic. From then I've been seeing on the diagram with constantly growing memory consumption during the consumer polling work. Line is growing untill GC is not called. Consumer just sends poll-requests and returns nothing but require a lot of memory.
I think it's problem. I tried to do some tuning, i.e. configuring some params as FETCH_MAX_BYTES_CONFIG/MAX_PARTITION_FETCH_BYTES_CONFIG/MAX_POLL_RECORDS_CONFIG but nothing actually worked out.
SSCCE:
KafkaConsumerConfig:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
private ConsumerFactory<String, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
private Map<String, Object> consumerProperties() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "local-test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1000000); //1mb
//props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
//props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 256000); //256kb
return props;
}
}
KafkaDataListener
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class KafkaDataListener {
@KafkaListener(topics = "local-test-topic", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvent(byte[] eventData, final Acknowledgment ack) {
try {
System.out.println("consumer received message:" + Arrays.toString(eventData));
}finally {
ack.acknowledge();
}
}
}
Main App
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
build.gradle:
group 'org.test.kafka'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '1.5.4.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.2.2.RELEASE'
}
output:
2018-02-02 23:38:51.506 INFO 14888 --- [ main] org.kafka.tests.App : Starting App on so-workstation with PID 14888 (/home/.../projects/custom/kafka-consumer-test/out/production/classes started by ... in /home/.../projects/custom/kafka-consumer-test)
2018-02-02 23:38:51.508 INFO 14888 --- [ main] org.kafka.tests.App : No active profile set, falling back to default profiles: default
2018-02-02 23:38:51.591 INFO 14888 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@281e3708: startup date [Fri Feb 02 23:38:51 MSK 2018]; root of context hierarchy
2018-02-02 23:38:52.028 INFO 14888 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2d249aa5] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-02-02 23:38:52.205 INFO 14888 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-02-02 23:38:52.207 INFO 14888 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-02-02 23:38:52.221 INFO 14888 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = local-test-consumer-group
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-02 23:38:52.225 INFO 14888 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = local-test-consumer-group
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-02 23:38:52.258 INFO 14888 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.2.0
2018-02-02 23:38:52.258 INFO 14888 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 576d93a8dc0cf421
2018-02-02 23:38:52.268 INFO 14888 --- [ main] org.kafka.tests.App : Started App in 1.056 seconds (JVM running for 1.337)
2018-02-02 23:38:52.308 INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka:9092 (id: 2147483646 rack: null) for group local-test-consumer-group.
2018-02-02 23:38:52.312 INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group local-test-consumer-group
2018-02-02 23:38:52.313 INFO 14888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2018-02-02 23:38:52.313 INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group local-test-consumer-group
2018-02-02 23:38:52.319 INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group local-test-consumer-group with generation 1
2018-02-02 23:38:52.320 INFO 14888 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [local-test-topic-3, local-test-topic-2, local-test-topic-1, local-test-topic-0] for group local-test-consumer-group
2018-02-02 23:38:52.328 INFO 14888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[local-test-topic-3, local-test-topic-2, local-test-topic-1, local-test-topic-0]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
consumer received message:[...]
Does anybody know how to tune it properly? Or maybe on some higher version memory usage is optimized?
Kafka server: 0.10.2.0 Kafka client: 0.10.2.0
See the images:
UPD:
For kafka consumer 1.0.0 (spring kafka 2.1.2) memory usage diagram looks a bit better. Now the consumption line is growing not so fast as before. But now RMI TCP Connection thread consumes even more memory that kafka consumer thread.
Moreover it seems that consumer's params are getting affects on memory usage.
With consumer's params FETCH_MAX_BYTES_CONFIG = 1mb
and MAX_PARTITION_FETCH_BYTES_CONFIG = 256kb
consumption gets lower.
Upvotes: 8
Views: 14584
Reputation: 1234
One cause of this "sawtooth pattern" is the application Java VisualVM itself. It is asking your JVM every second for information. The JVM then creates a lot of object for this process, which gets obsolete after sending to VisualVM and can therefore easily garbage collected.
Try to decrease the polling rate in the settings of VisualVM. It should minimize the effect.
Upvotes: 4