Reputation: 26789
Suppose I have two key-value data sets--Data Sets A and B, let's call them. I want to update all the data in Set A with data from Set B where the two match on keys.
Because I'm dealing with such large quantities of data, I'm using Hadoop to MapReduce. My concern is that to do this key matching between A and B, I need to load all of Set A (a lot of data) into the memory of every mapper instance. That seems rather inefficient.
Would there be a recommended way to do this that doesn't require repeating the work of loading in A every time?
Some pseudcode to clarify what I'm currently doing:
Load in Data Set A # This seems like the expensive step to always be doing
Foreach key/value in Data Set B:
If key is in Data Set A:
Update Data Seta A
Upvotes: 4
Views: 1878
Reputation: 18434
All of the answers posted so far are correct - this should be a Reduce-side join... but there's no need to reinvent the wheel! Have you considered Pig, Hive, or Cascading for this? They all have joins built-in, and are fairly well optimized.
Upvotes: 3
Reputation: 841
This video tutorial by Cloudera gives a great description of how to do a large-scale Join through MapReduce, starting around the 12 minute mark.
Here are the basic steps he lays out for joining records from file B onto records from file A on key K, with pseudocode. If anything here isn't clear, I'd suggest watching the video as he does a much better job explaining it than I can.
In your Mapper:
K from file A:
tag K to identify as Primary Key
emit <K, value of K>
K from file B:
tag K to identify as Foreign Key
emit <K, record>
Write a Sorter and Grouper which will ignore the PK/FK tagging, so that your records are sent to the same Reducer regardless of whether they are a PK record or a FK record and are grouped together.
Write a Comparator which will compare the PK and FK keys and send the PK first.
The result of this step will be that all records with the same key will be sent to the same Reducer and be in the same set of values to be reduced. The record tagged with PK will be first, followed by all records from B which need to be joined. Now, the Reducer:
value_of_PK = values[0] // First value is the value of your primary key
for value in values[1:]:
value.replace(FK,value_of_PK) // Replace the foreign key with the key's value
emit <key, value>
The result of this will be file B, with all occurrences of K replaced by the value of K in file A. You can also extend this to effect a full inner join, or to write out both files in their entirety for direct database storage, but those are pretty trivial modifications once you get this working.
Upvotes: 2
Reputation: 198
According to the documentation, the MapReduce framework includes the following steps:
You've described one way to perform your join: loading all of Set A into memory in each Mapper. You're correct that this is inefficient.
Instead, observe that a large join can be partitioned into arbitrarily many smaller joins if both sets are sorted and partitioned by key. MapReduce sorts the output of each Mapper by key in step (2) above. Sorted Map output is then partitioned by key, so that one partition is created per Reducer. For each unique key, the Reducer will receive all values from both Set A and Set B.
To finish your join, the Reducer needs only to output the key and either the updated value from Set B, if it exists; otherwise, output the key and the original value from Set A. To distinguish between values from Set A and Set B, try setting a flag on the output value from the Mapper.
Upvotes: 3