user1579557
user1579557

Reputation: 393

How to parallelize operations on partitions of a dataframe

I have a dataframe df =

+--------------------+
|                id|
+-------------------+
|113331567dc042f...|
|5ffbbd1adb4c413...|
|08c782a4ae854e8...|
|24cee418e04a461...|
|a65f47c2aecc455...|
|a74355ef35d442d...|
|86f1a9b7ffc843b...|
|25c8abd6895e445...|
|b89ce33788f4484...|
.....................

with million elements.
I want to repartition the dataframe into multiple partitions and pass each partition elelemts as list to database api call that returns spark dataset.

Something like this.

df2 = df.repartition(10)

df2.foreach-partition { partition =>
   val result = spark.read
                .format("custom.databse")
                .where(__key in partition.toList)
                .load
}

And at the end I would ike to do a Union of all the result datasets returned for each of the partition.

expected output will be final dataset of strings.

 +--------------------+
    |   customer names|
    +-------------------+
    |eddy              |
    |jaman             |
    |cally             |
    |sunny             |
    |adam              |
    .....................

Can anyone help me to convert it to real code in spark-scala

Upvotes: 1

Views: 824

Answers (2)

Sanket9394
Sanket9394

Reputation: 2091

Conceptually what you are asking is not really possible in Spark.

Your API call is a SparkContext dependent function ( i.e. spark.read ) , And one cannot use a SparkContext inside a partition function. In simpler words, you cannot pass spark object to executors. For ref

For even simpler imagination : think of of a Dataset having each row as Dataset. Is it even possible ? no.


In your case there can be 2 ways to solve this :

Case 1 : One by One then Union

  • Convert the keys to list and Split them evenly
  • FOr each split call spark.read api and keep Unioning .
//split into 10000 sized lists
val listOfListOfKeys : List[List[String]]= df.collect().grouped(10000).toList

//Bring Dataset for 1st 10000 keys (1st list)
var resultDf = spark.read.format("custom.databse")
              .where(__key in listOfListOfKeys.apply(0)).load

//drop the 1st item
listOfListOfKeys.drop(1)

//bring rest of them
for (listOfKeys <- listOfListOfKeys) {
    val tempDf = spark.read.format("custom.databse")
                 .where(__key in listOfKeys).load

    resultDf.union(tempDf);

}

There will scaling issues with this approah because of the collected data on the driver. But if you want to use the "spark.read" api, then this might be the only easy way.

Case 2 : foreachPartition + Normal DB call which returns a iterator

If you can find another way to get the data from your Db which returns a iterator or any single threaded spark independent object. Then you can achieve what you want exactly by applying what Filip has answered i.e. df.repartition.rdd.foreachPartition(yourDbCallFuntion())

Upvotes: 1

Filip
Filip

Reputation: 661

From what I see in documentation it could be possible to something like this. You'll have to use RDD API and SparkContext so you could use parallelize to partition your data into n partitions. After that you can call foreachPartition which should already give you iterator on your data directly, no need to collect data.

Upvotes: 1

Related Questions