Reputation: 393
I have a dataframe df =
+--------------------+
| id|
+-------------------+
|113331567dc042f...|
|5ffbbd1adb4c413...|
|08c782a4ae854e8...|
|24cee418e04a461...|
|a65f47c2aecc455...|
|a74355ef35d442d...|
|86f1a9b7ffc843b...|
|25c8abd6895e445...|
|b89ce33788f4484...|
.....................
with million elements.
I want to repartition the dataframe into multiple partitions and pass each partition elelemts as list to database api call that returns spark dataset.
Something like this.
df2 = df.repartition(10)
df2.foreach-partition { partition =>
val result = spark.read
.format("custom.databse")
.where(__key in partition.toList)
.load
}
And at the end I would ike to do a Union of all the result datasets returned for each of the partition.
expected output will be final dataset of strings.
+--------------------+
| customer names|
+-------------------+
|eddy |
|jaman |
|cally |
|sunny |
|adam |
.....................
Can anyone help me to convert it to real code in spark-scala
Upvotes: 1
Views: 824
Reputation: 2091
Conceptually what you are asking is not really possible in Spark.
Your API call is a SparkContext dependent function ( i.e. spark.read ) , And one cannot use a SparkContext inside a partition function. In simpler words, you cannot pass spark object to executors. For ref
For even simpler imagination : think of of a Dataset having each row as Dataset. Is it even possible ? no.
In your case there can be 2 ways to solve this :
Case 1 : One by One then Union
//split into 10000 sized lists
val listOfListOfKeys : List[List[String]]= df.collect().grouped(10000).toList
//Bring Dataset for 1st 10000 keys (1st list)
var resultDf = spark.read.format("custom.databse")
.where(__key in listOfListOfKeys.apply(0)).load
//drop the 1st item
listOfListOfKeys.drop(1)
//bring rest of them
for (listOfKeys <- listOfListOfKeys) {
val tempDf = spark.read.format("custom.databse")
.where(__key in listOfKeys).load
resultDf.union(tempDf);
}
There will scaling issues with this approah because of the collected data on the driver. But if you want to use the "spark.read" api, then this might be the only easy way.
Case 2 : foreachPartition + Normal DB call which returns a iterator
If you can find another way to get the data from your Db which returns a iterator or any single threaded spark independent object. Then you can achieve what you want exactly by applying what Filip has answered i.e. df.repartition.rdd.foreachPartition(yourDbCallFuntion())
Upvotes: 1
Reputation: 661
From what I see in documentation it could be possible to something like this. You'll have to use RDD
API and SparkContext
so you could use parallelize to partition your data into n
partitions. After that you can call foreachPartition which should already give you iterator on your data directly, no need to collect data.
Upvotes: 1