Shades88
Shades88

Reputation: 8360

Kafka producer huge memory usage (leak?)

We have a 3 broker Kafka 0.10.1.0 deployment in production. There are some applications which have Kafka Producers embedded in them which send application logs to a topic. This topic has 10 partitions with replication factor of 3.

We are observing that memory usage on some of these application servers keep shooting through the roof intermittently. After taking heapdump we found out that top suspects were:

**org.apache.kafka.common.network.Selector -**

occupies 352,519,104 (24.96%) bytes. The memory is accumulated in one instance of "byte[]" loaded by "<system class loader>".

**org.apache.kafka.common.network.KafkaChannel -**

occupies 352,527,424 (24.96%) bytes. The memory is accumulated in one instance of "byte[]" loaded by "<system class loader>"

Both of these were holding about 352MB of space. 3 such instances, so they were consuming about 1.2GB of memory.

Now regarding usage of producers. Not a huge amount of logs are being sent to Kafka cluster. It is about 200 msgs/sec. Only one producer object is being used throughout application. Async send function is used.

What could be the cause of such huge memory usage? Is this some sort of memory leak in this specific Kafka version?

Here's Kafka Producer config being used in production.

kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092
kafka.acks=0
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.max.block.ms=1000
kafka.request.timeout.ms=1000
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.compression.type=gzip
kafka.security.protocol=SSL
kafka.ssl.truststore.location=/data/kafka/kafka-server-truststore.jks
kafka.ssl.truststore.password=XXXXXX
kafka.linger.ms=300
logger.level=INFO

Here's a section from GC log showing Kafka network thread allocation

<allocation-stats totalBytes="3636833992" >
  <allocated-bytes non-tlh="3525405200" tlh="111428792" />
  <largest-consumer threadName="kafka-producer-network-thread | producer-1" threadId="0000000033A26700" bytes="3525287448" />
</allocation-stats>
<gc-op id="591417" type="scavenge" timems="21.255" contextid="591414" timestamp="2018-09-19T17:55:32.938">
  <scavenger-info tenureage="14" tenuremask="4000" tiltratio="89" />
  <memory-copied type="nursery" objects="61155" bytes="6304384" bytesdiscarded="3968416" />
  <memory-copied type="tenure" objects="1199" bytes="230312" bytesdiscarded="38656" />
  <finalization candidates="461" enqueued="316" />
  <ownableSynchronizers candidates="18" cleared="5" />
  <references type="soft" candidates="231" cleared="0" enqueued="0" dynamicThreshold="23" maxThreshold="32" />
  <references type="weak" candidates="20" cleared="2" enqueued="1" />
  <references type="phantom" candidates="2" cleared="0" enqueued="0" />
</gc-op>
<gc-end id="591418" type="scavenge" contextid="591414" durationms="21.715" usertimems="11.640" systemtimems="0.125" timestamp="2018-09-19T17:55:32.939" activeThreads="64">
  <mem-info id="591419" free="4226106664" total="6049234944" percent="69">
    <mem type="nursery" free="3855164752" total="4294967296" percent="89">
      <mem type="allocate" free="3855164752" total="3865444352" percent="99" />
      <mem type="survivor" free="0" total="429522944" percent="0" />
    </mem>
    <mem type="tenure" free="370941912" total="1754267648" percent="21">
      <mem type="soa" free="362646600" total="1740233728" percent="20" />
      <mem type="loa" free="8295312" total="14033920" percent="59" />
    </mem>
    <pending-finalizers system="315" default="1" reference="1" classloader="0" />
    <remembered-set count="4110" />
  </mem-info>
</gc-end>
<cycle-end id="591420" type="scavenge" contextid="591414" timestamp="2018-09-19T17:55:32.940" />
<allocation-satisfied id="591421" threadId="0000000033A26700" bytesRequested="352518920" />
<af-end id="591422" timestamp="2018-09-19T17:55:32.962" />
<exclusive-end id="591423" timestamp="2018-09-19T17:55:32.962" durationms="45.987" />

Upvotes: 5

Views: 8966

Answers (1)

Subash
Subash

Reputation: 895

There can be many reasons.But if you need to optimise,you can try out the below things:

  1. replica.fetch.max.bytes-Buffer size of each partition.Number of partitions multiplied by the size of the largest message does not exceed available memory. Same applies for consumers- fetch.message.max.bytes,max.partition.fetch.bytes-The maximum amount of data per-partition the server will return. Check pointing of missing data can be done by using- replica.high.watermark.checkpoint.interval.ms which will tune the throughput.

2.batch.size(shouldn't exceed available memory) and linger.ms(sets the maximum time to buffer data,if its asynchronous)

Upvotes: 2

Related Questions