proutray
proutray

Reputation: 2033

How do I perform a one to many map-reduce join?

What about the one to many join case when we have:

file1

personid1, name1

personid2, name2

file2

personid1, address2

file2

personid2, address2

I want to have reducer output

personid1, name1, address2

personid2, name2, address2

Upvotes: 1

Views: 1066

Answers (2)

HEKTO
HEKTO

Reputation: 4191

I'm assuming that each personid can have only one name but many addresses.

A mapper should scan all your input files and produce key-value pairs like this:

(personid1, (0, name1))
(personid2, (0, name2))
(personid1, (1, address2))
(personid2, (1, address2))

The integer flag 0 denotes that the record has come from the file1 and the flag 1 denotes that the record has come from other type of file.

A reducer input will be:

(personid1, [(0, name1), (1, address2)])
(personid2, [(1, address2), (0, name2)])

The Hadoop can't guarantee the order of original values in the shuffle output, so I've changed this order in the second line above just to illustrate this. The reducer job is to decode the value of each record (the list in square brackets) - the pair (flag, value) with flag = 0 will give you the name, and all the other pairs will give you all the addresses of this person.

Enjoy the Hadoop!

Upvotes: 1

Dhruv Kapur
Dhruv Kapur

Reputation: 736

It seems like you can use personid as the key from your mappers.

Then you'll be sure to get all records belonging to one personid in one reducer as an iterator. Now you'll need to distinguish which record comes from which source, so its better to put an identifier onto the value.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ExampleDriver extends Configured implements Tool {


  @Override
  public int run(String[] args) throws Exception {
    Configuration configuration = getConf();

    Job job = Job.getInstance(configuration, this.getClass().getName());

    job.setJarByClass(ExampleDriver.class);

    MultipleInputs.addInputPath(job, new Path(PERSON_DIR), TextInputFormat.class, PersonMapper.class);
    MultipleInputs.addInputPath(job, new Path(ADDRESS_DIR), TextInputFormat.class, AddressMapper.class);


    job.setMapOutputKeyClass(KeyWithPersonId.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(JoinPersonWithAddressReducer.class);

    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); // Not necessary. Can use simple FileOutputFormat.

    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new Configuration(), new ExampleDriver(), args);
    System.exit(exitCode);
  }
}

Let me know if you have more questions.

Upvotes: 1

Related Questions