user1326784
user1326784

Reputation: 657

How to control number of records while writing Spark Dataframe to Kafka Producer using Spark Java

I have a spark dataframe with two columns, 'keyCol' column and 'valCol' column. The dataframe is of huge size, nearly 100 million rows. I want to write/produce the dataframe to a kafka topic in mini batches, i.e. 10000 records per minute. This spark job is going to run once per day which creates this dataframe

How to implement writing in mini batches of 10000 records per minute in below code, or please suggest if there is any better/efficient way to implement this.

spark_df.foreachPartition(partitions ->{
            Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
            while (partitions) {
                Row row =  partitions.next();
                producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       //Callback code goes here
                    }
                });
            }
            return;
        });

Upvotes: 1

Views: 601

Answers (1)

QuickSilver
QuickSilver

Reputation: 4045

You can use grouped(10000) function as below and execute sleep thread for a minute

config.foreachPartition(f => {
      f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch

        roqSeq.foreach( row => {
          producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
            def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
              //Callback code goes here
            }
          })
        })
          Thread.sleep(60000) // Sleep for 1 minute
        }
      )
    })

Upvotes: 2

Related Questions