Reputation: 454
I receive a Dataset and I am required to join
it with another Table. Hence the most simple solution that came to my mind was to create a second Dataset for the other table and perform the joinWith
.
def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] = {
val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
dogs.joinWith(cats, ...)
}
Here my main concern is with spark.table("dev_db.cat")
, as it feels like we are referring to all of the cat
data as
SELECT * FROM dev_db.cat
and then doing a join
at a later stage. Or will the query optimizer directly perform the join with out referring to the whole table? Is there a better solution?
Upvotes: 1
Views: 594
Reputation: 18013
You need to do an explain and see if predicate push down is used. Then you can judge your concern to be correct or not.
However, in general now, if no complex datatypes are used and/or datatype mismatches are not evident, then push down takes place. You can see that with simple createOrReplaceTempView as well. See https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html
Upvotes: 1
Reputation: 7316
Here are some suggestions for your case:
a. If you have where
, filter
, limit
, take
etc operations try to apply them before joining the two datasets. Spark can't push down these kind of filters therefore you have to do by your own reducing as much as possible the amount of target records. Here an excellent source of information over the Spark optimizer.
b. Try to co-locate the datasets and minimize the shuffled data by using repartition
function. The repartition should be based on the keys that participate in join
i.e:
dogs.repartition(1024, "key_col1", "key_col2")
dogs.join(cats, Seq("key_col1", "key_col2"), "inner")
c. Try to use broadcast
for the smaller dataset if you are sure that it can fit in memory (or increase the value of spark.broadcast.blockSize
). This consists a certain boost for the performance of your Spark program since it will ensure the co-existense of two datasets within the same node.
If you can't apply any of the above then Spark doesn't have a way to know which records should be excluded and therefore will scan all the available rows from both datasets.
Upvotes: 1