bitan
bitan

Reputation: 454

Spark Dataset join performance

I receive a Dataset and I am required to join it with another Table. Hence the most simple solution that came to my mind was to create a second Dataset for the other table and perform the joinWith.

    def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] = {
      val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
      dogs.joinWith(cats, ...)
    }

Here my main concern is with spark.table("dev_db.cat"), as it feels like we are referring to all of the cat data as

    SELECT * FROM dev_db.cat

and then doing a join at a later stage. Or will the query optimizer directly perform the join with out referring to the whole table? Is there a better solution?

Upvotes: 1

Views: 594

Answers (2)

Ged
Ged

Reputation: 18013

You need to do an explain and see if predicate push down is used. Then you can judge your concern to be correct or not.

However, in general now, if no complex datatypes are used and/or datatype mismatches are not evident, then push down takes place. You can see that with simple createOrReplaceTempView as well. See https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html

Upvotes: 1

abiratsis
abiratsis

Reputation: 7316

Here are some suggestions for your case:

a. If you have where, filter, limit, take etc operations try to apply them before joining the two datasets. Spark can't push down these kind of filters therefore you have to do by your own reducing as much as possible the amount of target records. Here an excellent source of information over the Spark optimizer.

b. Try to co-locate the datasets and minimize the shuffled data by using repartition function. The repartition should be based on the keys that participate in join i.e:

dogs.repartition(1024, "key_col1", "key_col2")
dogs.join(cats, Seq("key_col1", "key_col2"), "inner")

c. Try to use broadcast for the smaller dataset if you are sure that it can fit in memory (or increase the value of spark.broadcast.blockSize). This consists a certain boost for the performance of your Spark program since it will ensure the co-existense of two datasets within the same node.

If you can't apply any of the above then Spark doesn't have a way to know which records should be excluded and therefore will scan all the available rows from both datasets.

Upvotes: 1

Related Questions