irbull
irbull

Reputation: 2530

Does joining additional columns in Spark scale horizontally?

I have a dataset with about 2.4M rows, with a unique key for each row. I have performed some complex SQL queries on some other tables, producing a dataset with two columns, a key and the value true. This dataset is about 500 rows. Now I would like to (outer) join this dataset with my original table.

This produces a new table with a very sparse set of values (true in about 500 rows, null elsewhere).

Finally, I would like to do this about 200 times, giving me a final table of about 201 columns (the key, plus the 200 sparse columns).

When I run this, I notice that as it runs it gets considerably slower. The first join takes 2 seconds, then 4s, then 6s, then 10s, then 20s and after about 30 joins the system never recovers. Of course, the actual numbers are irrelevant as that depends on the cluster I'm running, but I'm wondering:

Upvotes: 0

Views: 408

Answers (1)

user9028408
user9028408

Reputation: 36

Is this slowdown is expected

Yes, to some extent it is. Joins belong to the most expensive operations in a data intensive systems (it is not a coincidence that products which claim linear scalability usually take joins out of the table). Join-like operation in a distributed system typically require data exchange between nodes hitting a bunch of high latency numbers.

In Spark SQL there is also additional cost of computing execution plan, which has larger than linear complexity.

I am using parquet as a data storage format (columnar storage) so I was hopeful that adding more columns would scale horizontally, is that a correct assumption?

No. Input format doesn't affect join logic at all.

All the columns I've joined so far are not needed for the Nth join, can they be unloaded from memory?

If truly excluded from the final output they will be pruned from the execution plan. But since you for a reason, I assume it is not the case and there are required for the final output.

Is there a way to really checkpoint so each join is just a join? I am actually calling show() after each join, so I assumed the join is actually happening at that point.

show computes only a small subset of data required for the output. It doesn't cache, although shuffle files might be reused.

(appears to include all previous joins and it also includes the complex sql queries, even though those have been checkpointed).

Checkpoints are created only if data is fully computed and don't remove stages from the execution plan. If you want to do it explicitly, write partial result to persistent storage and read it back at the beginning of each iteration (it is probably an overkill).

Are there other things I can do when combining lots of columns in spark?

The best thing you can do is to find a way to avoid joins completely. If key is always the same then single shuffle, and operation on groups / partitions (with byKey method, window functions) might be better choice.

However if you

have a dataset with about 2.4M rows

then using non-distributed system that supports in-place modification might be much better choice.

In the most naive implementation you can compute each aggregate separately, sort by key and write to disk. Then data can be merged together line by line with negligible memory footprint.

Upvotes: 2

Related Questions