Reputation: 657
I have a spark dataframe with two columns, 'keyCol' column and 'valCol' column. The dataframe is of huge size, nearly 100 million rows. I want to write/produce the dataframe to a kafka topic in mini batches, i.e. 10000 records per minute. This spark job is going to run once per day which creates this dataframe
How to implement writing in mini batches of 10000 records per minute in below code, or please suggest if there is any better/efficient way to implement this.
spark_df.foreachPartition(partitions ->{
Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
while (partitions) {
Row row = partitions.next();
producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//Callback code goes here
}
});
}
return;
});
Upvotes: 1
Views: 601
Reputation: 4045
You can use grouped(10000)
function as below and execute sleep thread for a minute
config.foreachPartition(f => {
f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch
roqSeq.foreach( row => {
producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
//Callback code goes here
}
})
})
Thread.sleep(60000) // Sleep for 1 minute
}
)
})
Upvotes: 2