Reputation: 694
I'm writing a spark streaming job that reads data from Kafka, makes some changes to the records and sends the results to another Kafka cluster.
The performance of the job seems very slow, the processing rate is about 70,000 records per second. The sampling shows that 30% of the time is spent on reading data and processing it and the remaining 70% spent on sending data to the Kafka.
I've tried to tweak the Kafka configurations, add memory, change batch intervals, but the only change that works is to add more cores.
Spark job details:
max.cores 30
driver memory 6G
executor memory 16G
batch.interval 3 minutes
ingres rate 180,000 messages per second
Producer Properties (I've tried different varations)
def buildProducerKafkaProperties: Properties = {
val producerConfig = new Properties
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, destKafkaBrokers)
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all")
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, "200000")
producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "2000")
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
producerConfig.put(ProducerConfig.RETRIES_CONFIG, "0")
producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "13421728")
producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, "13421728")
producerConfig
}
Sending code
stream
.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
.map(consumerRecord => doSomething(consumerRecord))
.foreachPartition(partitionIter => {
val producer = kafkaSinkBroadcast.value
partitionIter.foreach(row => {
producer.send(kafkaTopic, row)
producedRecordsAcc.add(1)
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
Versions
Spark Standalone cluster 2.3.1
Destination Kafka cluster 1.1.1
Kafka topic has 120 partitions
Can anyone suggest how to increase sending throughput?
Update Jul 2019
size: 150k messages per second, each message has about 100 columns.
main settings:
spark.cores.max = 30 # the cores balanced between all the workers.
spark.streaming.backpressure.enabled = true
ob.ingest.batch.duration= 3 minutes
I've tried to use rdd.repartition(30), but it made the execution slower by ~10%
Thanks
Upvotes: 1
Views: 1895
Reputation: 1313
Try to use repartition as below -
val numPartitons = ( Number of executors * Number of executor cores )
stream
.repartition(numPartitons)
.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
.map(consumerRecord => doSomething(consumerRecord))
.foreachPartition(partitionIter => {
val producer = kafkaSinkBroadcast.value
partitionIter.foreach(row => {
producer.send(kafkaTopic, row)
producedRecordsAcc.add(1)
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
This will give you optimum performance.
Hope this will help.
Upvotes: 0