Reputation: 21
I'm trying to select all elements present in foo but not present in bar. I'm using this code:
foo.repartition(1)
foo.cache()
bar.repartition(1)
bar.cache()
foo.select("col_1").except(bar.select("col_1"))
Is there a better or faster way to do this? This is currently taking more than 15+ mins when running on a cluster.
Additional info: foo will have around 100-1000 elements. bar will have 40 million+ elements. foo is a dataframe consisting of data read from hive table(50 columns) using Spark SQL. bar is a dataframe consisting of data read from kudu table(250 columns) using KuduContext.
Using Spark 2.2 on CDH 5.15.x with Scala 2.11.8.
Upvotes: 1
Views: 108
Reputation: 7316
As @Tzach suggested it is better to avoid using bar.repartition(1)
and bar.cache()
since the bar dataset seems to be too big to fit in memory. You can use cache for the small dataset though or even better to try to broadcast it to each executor. Also if you know the size of the big dataset you can calculate the partition number with partition_num = total_size / 500MB
, 250-500 MB is the ideal size of each partition therefore if your data is 10GB this should be 10GB/500MB = 20 partitions.
Here is your code after the mentioned changes:
foo.cache() //feel free to cache the small dataset
bar.repartition(partitions_num) //this is optional
foo.select("col_1").except(bar.select("col_1"))
Also you can try to use an left_anti join instead as shown next and compare their performance:
foo.join(bar, foo("col_1") === bar("col_1"), "left_anti").show
This will exclude all records from foo for which col_1 exists in bar.
If your requirements required the opposite aka excluding records from bar that exist in foo then your program could be even more efficient by broadcasting the small dataset foo as in the next code snippet:
import org.apache.spark.sql.functions.broadcast
bar.join(broadcast(foo), bar("col1") === foo("col1"), "left_anti").show
Good luck!
Upvotes: 2