Sandeep Shetty
Sandeep Shetty

Reputation: 177

Iterating a huge data frame in spark/scala

I have a dataframe with 500 million rows. I would like to iterate through each row and modify the column names/drop few columns and also update the column values based on few conditions. I am using the below approach with collect.

df.collect.foreach(row => mycustomeMethod())

As collect will bring all the data to the driver i am faces out of memory errors.Can you please suggest any alternate ways of achieving the same.

We are using spark-cassandra connector by datastax. I tried different approaches but nothing that helped to improve the performance.

Upvotes: 3

Views: 8718

Answers (1)

Davis Broda
Davis Broda

Reputation: 4137

Use a map operation instead of a collect/foreach, and convert back to RDD. That will allow the calculations to be distributed around the cluster, instead of forcing it all into one node. You can do this by modifying your custom method to take and return a Row, which can then be converted back to a DataFrame.

val oldSchema = originalDf.schema
val newSchema = //TODO: put new schema based on what you want to do
val newRdd = originalDf.map(row => myCustomMethod(row))
val newDf = sqlContext.createDataFrame(newRdd,newSchema)

Dropping rows can then be handled through the .drop method on the new DataFrame.

This may run into problems if your custom method is not serializable - or rather contains objects that are not serializable - in which case switch to a mapPartitions method, so that you can force each node to create a copy of the relevant objects first.

Upvotes: 3

Related Questions