Arkadiy Verman
Arkadiy Verman

Reputation: 694

Spark Streaming to Kafka slow performance

I'm writing a spark streaming job that reads data from Kafka, makes some changes to the records and sends the results to another Kafka cluster.

The performance of the job seems very slow, the processing rate is about 70,000 records per second. The sampling shows that 30% of the time is spent on reading data and processing it and the remaining 70% spent on sending data to the Kafka.

I've tried to tweak the Kafka configurations, add memory, change batch intervals, but the only change that works is to add more cores.

profiler: profiler snapshot

Spark job details:

max.cores 30
driver memory 6G
executor memory 16G
batch.interval 3 minutes
ingres rate 180,000 messages per second

Producer Properties (I've tried different varations)

def buildProducerKafkaProperties: Properties = {
  val producerConfig = new Properties
  producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBrokers)
  producerConfig.put(ProducerConfig.ACKS_CONFIG, "all")
  producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "200000")
  producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "2000")
  producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
  producerConfig.put(ProducerConfig.RETRIES_CONFIG, "0")
  producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "13421728")
  producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, "13421728")
  producerConfig
}

Sending code

 stream
    .foreachRDD(rdd => {    
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd
          .map(consumerRecord => doSomething(consumerRecord))
          .foreachPartition(partitionIter => {
            val producer = kafkaSinkBroadcast.value    
            partitionIter.foreach(row => {
              producer.send(kafkaTopic, row)
              producedRecordsAcc.add(1)
            })

        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })

Versions

Spark Standalone cluster 2.3.1
Destination Kafka cluster 1.1.1
Kafka topic has 120 partitions 

Can anyone suggest how to increase sending throughput?


Update Jul 2019

size: 150k messages per second, each message has about 100 columns.

main settings:

spark.cores.max = 30 # the cores balanced between all the workers.
spark.streaming.backpressure.enabled = true
ob.ingest.batch.duration= 3 minutes 
 

I've tried to use rdd.repartition(30), but it made the execution slower by ~10%

Thanks

Upvotes: 1

Views: 1895

Answers (1)

Ajay Ahuja
Ajay Ahuja

Reputation: 1313

Try to use repartition as below -

val numPartitons = ( Number of executors * Number of executor cores )

stream
    .repartition(numPartitons)
    .foreachRDD(rdd => {    
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd
          .map(consumerRecord => doSomething(consumerRecord))
          .foreachPartition(partitionIter => {
            val producer = kafkaSinkBroadcast.value    
            partitionIter.foreach(row => {
              producer.send(kafkaTopic, row)
              producedRecordsAcc.add(1)
            })

        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })

This will give you optimum performance.

Hope this will help.

Upvotes: 0

Related Questions