alexanoid
alexanoid

Reputation: 25852

Apache Spark, range-joins, data skew and performance

I have the following Apache Spark SQL join predicate:

t1.field1 = t2.field1 and t2.start_date <= t1.event_date and t1.event_date < t2.end_date

data:

t1 DataFrame have over 50 millions rows
t2 DataFrame have over 2 millions rows

almost all t1.field1 fields in t1 DataFrame have the same value(null).

Right now the Spark cluster hangs for more than 10 minutes on a single task in order to perform this join and because of data skew. Only one worker and one task on this worker works at this point of time. All other 9 workers are idle. How to improve this join in order to distribute the load from this one particular task to whole Spark cluster?

Upvotes: 2

Views: 6689

Answers (3)

Eyal
Eyal

Reputation: 3513

If almost all the rows in t1 have t1.field1 = null, and the event_date row is numeric (or you convert it to a timestamp), you can first use Apache DataFu to do a ranged join, and filter out the rows in which t1.field1 != t2.field1 afterwards.

The range join code would look like this:

t1.joinWithRange("event_date", t2, "start_date", "end_date", 10)

The last argument - 10 - is the decrease factor. This does bucketing, as Raphael Roth suggested in his answer.

You can see an example of such a ranged join in the blog post introducing DataFu-Spark.

Full disclosure - I am a member of DataFu and wrote the blog post.

Upvotes: 3

Raphael Roth
Raphael Roth

Reputation: 27373

I assume spark already pushed the not-null filter on t1.field1, you can verify this in the explain-plan.

I would rather experiment with creating an additional attribute which can be used as an equi-join condition, e.g. by bucketing. For example you could create a month attribute. To do this, you would need to enumerate monthsin t2, this is usually done using UDFs. See this SO-question for an example : How to improve broadcast Join speed with between condition in Spark

Upvotes: 1

Bishnu
Bishnu

Reputation: 423

I am assuming you are doing inner join.

Below steps can be followed to optimise join - 1. Before joining we can filter out t1 and t2 based on smallest or largest start_date, event_date, end_date. It will reduce number of rows.

  1. Check if t2 dataset have null value for field1, if not before join t1 dataset can be filtered based on notNull condition. It will reduce t1 size

  2. If your job is getting only few executors than available one then you have less number of partitions. Simply repartition the dataset, set an optimal number so than you don't endup with large number of partitions or vice versa.

  3. You can check if partitioning has happened properly (no skewness) by looking at tasks execution time, it should be similar.

  4. Check if smaller dataset can be fit in executors memory, broadcast_join can be used.

You might like to read - https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/Apache-Spark-Join-guidelines-and-Performance-tuning

Upvotes: 3

Related Questions