user6251278
user6251278

Reputation: 23

Spark - Collect partitions using foreachpartition

We are using spark for file processing. We are processing pretty big files with each file around 30 GB with about 40-50 million lines. These files are formatted. We load them into data frame. Initial requirement was to identify records matching criteria and load them to MySQL. We were able to do that.

Requirement changed recently. Records not meeting criteria are now to be stored in an alternate DB. This is causing issue as the size of collection is too big. We are trying to collect each partition independently and merge into a list as suggested here

https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/dont_collect_large_rdds.html

We are not familiar with scala, so we are having trouble converting this to Java. How can we iterate over partitions one by one and collect?

Thanks

Upvotes: 2

Views: 10596

Answers (1)

Ravikumar
Ravikumar

Reputation: 1131

Please use df.foreachPartition to execute for each partition independently and won't returns to driver. You can save the matching results into DB in each executor level. If you want to collect the results in driver, use mappartitions which is not recommended for your case.

Please refer the below link

Spark - Java - foreachPartition

dataset.foreachPartition(new ForeachPartitionFunction<Row>() {
            public void call(Iterator<Row> r) throws Exception {
                while (t.hasNext()){

                    Row row = r.next();
                    System.out.println(row.getString(1));

                }
                // do your business logic and load into MySQL.
            }
        });

For mappartitions:

// You can use the same as Row but for clarity I am defining this.

public class ResultEntry implements Serializable {
  //define your df properties ..
}


Dataset<ResultEntry> mappedData = data.mapPartitions(new MapPartitionsFunction<Row, ResultEntry>() {

@Override
public Iterator<ResultEntry> call(Iterator<Row> it) {
  List<ResultEntry> filteredResult = new ArrayList<ResultEntry>();
  while (it.hasNext()) {
   Row row = it.next()
   if(somecondition)
       filteredResult.add(convertToResultEntry(row));
 }
return filteredResult.iterator();
}
}, Encoders.javaSerialization(ResultEntry.class));

Hope this helps.

Ravi

Upvotes: 3

Related Questions