Reputation: 2033
What about the one to many join case when we have:
file1
personid1, name1
personid2, name2
file2
personid1, address2
file2
personid2, address2
I want to have reducer output
personid1, name1, address2
personid2, name2, address2
Upvotes: 1
Views: 1066
Reputation: 4191
I'm assuming that each personid
can have only one name but many addresses.
A mapper should scan all your input files and produce key-value pairs like this:
(personid1, (0, name1))
(personid2, (0, name2))
(personid1, (1, address2))
(personid2, (1, address2))
The integer flag 0
denotes that the record has come from the file1
and the flag 1
denotes that the record has come from other type of file.
A reducer input will be:
(personid1, [(0, name1), (1, address2)])
(personid2, [(1, address2), (0, name2)])
The Hadoop can't guarantee the order of original values in the shuffle output, so I've changed this order in the second line above just to illustrate this. The reducer job is to decode the value of each record (the list in square brackets) - the pair (flag, value)
with flag = 0
will give you the name, and all the other pairs will give you all the addresses of this person.
Enjoy the Hadoop!
Upvotes: 1
Reputation: 736
It seems like you can use personid as the key from your mappers.
Then you'll be sure to get all records belonging to one personid in one reducer as an iterator. Now you'll need to distinguish which record comes from which source, so its better to put an identifier onto the value.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ExampleDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
Job job = Job.getInstance(configuration, this.getClass().getName());
job.setJarByClass(ExampleDriver.class);
MultipleInputs.addInputPath(job, new Path(PERSON_DIR), TextInputFormat.class, PersonMapper.class);
MultipleInputs.addInputPath(job, new Path(ADDRESS_DIR), TextInputFormat.class, AddressMapper.class);
job.setMapOutputKeyClass(KeyWithPersonId.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(JoinPersonWithAddressReducer.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); // Not necessary. Can use simple FileOutputFormat.
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new ExampleDriver(), args);
System.exit(exitCode);
}
}
Let me know if you have more questions.
Upvotes: 1