Gary Gershon
Gary Gershon

Reputation: 83

Kafka Producer 0.9 performance issue with small messages

We are observing very poor performance with a Java Kafka Producer 0.9 client when sending small messages. The messages are not being accumulated into a larger request batch and thus each small record is being sent separately.

What is wrong with our client configuration? Or is this some other issue?


Using Kafka Client 0.9.0.0. We did not see any related postings in the Kafka unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused on our client configuration and server instance.

We understand the linger.ms should cause the client to accumulate records into a batch.

We set linger.ms to 10 (and also tried 100 and 1000) but these did not result in the batch accumulating records. With a record size of about 100 bytes and a request buffer size of 16K, We would have expected about 160 messages to be sent in a single request.

The trace at the client seems to indicate that the partition may be full, despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9) service instance. The test client is sending multiple messages in a loop with no other I/O.


The log shows a repeating sequence with a suspect line: "Waking up the sender since topic mytopic partition 0 is either full or getting a new batch".

So the newly allocated partition should be essentially empty in our test case, thus why would the producer client be getting a new batch?

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating a new 16384 byte message buffer for topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Received produce response from node 0 with correlation id 11  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch  - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null.  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Send returned metadata: Topic='mytopic', Partition=0, Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'

Log entries repeat like the above for each record sent

We provided the following properties file:

2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient  - Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     acks=-1
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     client.id=ExploreProducer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     security.protocol=SASL_SSL

Plus we added linger.ms=10 in code.

The Kafka Client shows the expanded/merged configuration list (and displaying the linger.ms setting):

2015-12-10 15:14:37,970 312  [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = [hidden]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = ExploreProducer
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    acks = -1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 10

The Kafka metrics after sending 100 records:

Duration for 100 sends 8787 ms. Sent 7687 bytes.  
    batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.]  
    batch-size-max = 110.0 [The max number of bytes sent per partition per-request.]  
    buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).]  
    buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion]  
    buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).]  
    bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.]  
    byte-rate = 291.8348916277093 []  
    compression-rate = 0.0 []  
    compression-rate-avg = 0.0 [The average compression rate of record batches.]  
    connection-close-rate = 0.0 [Connections closed per second in the window.]  
    connection-count = 2.0 [The current number of active connections.]  
    connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.]  
    incoming-byte-rate = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O]  
    io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.]  
    io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
    metadata-age = 8.096 [The age in seconds of the current producer metadata being used.]  
    network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.]  
    outgoing-byte-rate = 451.2298783403283 []  
    produce-throttle-time-avg = 0.0 [The average throttle time in ms]  
    produce-throttle-time-max = 0.0 [The maximum throttle time in ms]  
    record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors]  
    record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.]  
    record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.]  
    record-retry-rate = 0.0 []  
    record-send-rate = 2.65611304417116 [The average number of records sent per second.]  
    record-size-avg = 97.87 [The average record size]  
    record-size-max = 98.0 [The maximum record size]  
    records-per-request-avg = 1.0 [The average number of records per request.]  
    request-latency-avg = 0.0 [The average request latency in ms]  
    request-latency-max = 74.0 []  
    request-rate = 2.6468892499606897 [The average number of requests sent per second.]  
    request-size-avg = 42.0 [The average size of all requests in the window..]  
    request-size-max = 170.0 [The maximum size of any request sent in the window.]  
    requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.]  
    response-rate = 2.651196976060479 [The average number of responses received per second.]  
    select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second]  
    waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]  

Thanks

Upvotes: 1

Views: 5167

Answers (1)

Gary Gershon
Gary Gershon

Reputation: 83

Guozhang Wang on the Kafka Users mailing list was able to recognize the problem by reviewing our application code:

Guozhang,

Yes - you identified the problem!

We had inserted the .get() for debugging, but didn’t think of the (huge!) side-effects.

Using the async callback works perfectly well.

We are now able to send 100,000 records in 14 sec from a laptop to the Bluemix cloud - ~1000x faster,

Thank you very much!

Gary


On Dec 13, 2015, at 2:48 PM, Guozhang Wang wrote:

Gary,

You are calling "kafkaProducer.send(record).get();" for each message, the get() call block until the Future is initialized, which effectively synchronize all message sent by asking for the ACK for each message before sending the next message, hence no batching.

You can try using "send(record, callback)" for async sending and let the callback handle errors from the returned metadata.

Guozhang

Upvotes: 5

Related Questions