Reputation: 1
I have a very huge dataset on which I need to perform a join in order to enrich it with additional columns.
Dataset A contains the structure:-
origin|destination|segment1_origin|segment2_origin|segment3_origin|segment4_origin|segment5_origin|segment6_origin|segment1_destination|segment2_destination|segment3_destination|segment4_destination|segment5_destination|segment6_destination
and contains around 5 billion rows
Dataset B contains the structure:-
origin|destination|stops|route
The dataset B actually holds information about every segment in dataset A and is almost 6 times the size of dataset A
In order to enrich the stop and route details what I'm doing right now is:-
for (x <- 1 to 6){
DatasetB.withColumnRenamed("stops", s"segment${x}_stops").withColumnRenamed("route", s"segment${x}_route")
DatasetA.join(DatasetB, (col(s"segment${x}departure") === col("origin"))
&& (col(s"segment${x}Arrival") === col("destination")), "left").drop("origin", "destination")
}
And this solution is working fine. But my worry point is that I'm joining it 6 times. I was just curious to know if there is any way to make this optimized? This is causing skewness and the job gets slow at later stages.
Is there a way in Scala/Spark dataframe to write this in a better way?
Upvotes: 0
Views: 171
Reputation: 1307
you can do spark.sql
join with or
condition so that you dont need to loop them.
Also if your data is skewed and taking time on few partitions then you can try salting
technique, which will increase size of data but joins wont be stuck for longer.
Upvotes: 1