detcle
detcle

Reputation: 79

How to efficiently join two directories that are already partitioned

Suppose I have two different data sets A and B, and they are both already partitioned by joinKey and laid out in the filesystem like A/joinKey/<files> and B/joinKey<files> in the filesystem.

For example, let's say joinKey is some hash value. Then, there may be directories like A/000/<files>, A/001/<files>, A/002/<files>, and so on. And there is also B/000/<files>, B/001/<files>, B/002/<files>, and so on.

I can only join with the same joinKey, so I can only join A/000/<files> with B/000/<files>; I can only join A/001/<files> with B/001/<files>; and so on.

What is the best way to write a Spark job to join the data in A/000/<files> with the data in B/000/<files> to produce output C/000/file; and then join the data in A/001/<files> with the data in B/001/<files> to produce output C/001/files; and so on?

My idea right now is that we would iterate over all joinKey values in Spark (e.g. do a simple for loop over the string values 000, 001, 002, etc.), and then load A/000/<files> as a DataFrame and load B/000/<files> as a DataFrame and join them; then, load A/001/<files> as a DataFrame and load B/001/<files> as a DataFrame and join. Is this reasonable, or is there a better approach?

For reference, when I say A/000/<files> could be a DataFrame, let's say its schema is <username, age>. While B/000/<files> could have the schema <username, email>, and I want to do another join of the two DataFrames by the username column. But this join could happen in one executor or node since both tables (A/000/<files> and B/000/<files>) are small enough to fit in memory to not have to spread the data across multiple executors. So one executor would join A/000/<files> with B/000/<files>; another executor would join A/001/<files> with B/001/<files>.

The idea here is that the datasets A and B are already partitioned with the joinKey (000, 001, 002, etc.) in the filesystem. Only 000 data from A can join with 000 data from B, so it would be great to take advantage of this by having one executor take care of 000 data for A and 000 data for B. From there, the executor can do a join without shuffling or partitioning, and instead just keeping the entire DataFrames in memory and then further join by yet another joinKey, the username in this example.

Upvotes: 1

Views: 64

Answers (1)

Matt Andruff
Matt Andruff

Reputation: 5135

Are they really partitioned by the same key? if so your yes use two dataframes It should be a fast join.

Your questions is a little unclear so...

If you merely are saying that they are in a similar file structure, yes use two data frames it won't be as fast but still this is how you join two data sets.

You are correct in thinking that if they're partitioned the same it will be a faster join then not using the same partition key.

Once you join them write it to file. You should only partition the joined file if you intend to access the data by the partition key. Partitioning does slow down general queries that don't involve the partition key.

Upvotes: 1

Related Questions