SkogensKonung
SkogensKonung

Reputation: 663

How to combine the results of two separate mappers in hadoop?

I would like to combine the results of two separate mappers, and then execute the reducer on the combined results.

I have two files. The first file has the following columns: A, B, C. The second file: A, D. Now, both mappers have the same signature: Mapper<LongWritable, Text, LongWritable, Text>. The output of the first mapper is KEY: new LongWritable(A) and VALUE: new Text(B, C) if a specific condition is met. The output of the second KEY: new LongWritable(A) and VALUE: new Text(D) if another condition is met.

Now, when I output values from Iterable<Text> in my reducer, I obtain either B+C or D. Given that two sets have intersections, how can I obtain B, C, D for a given A in the reducer?

Upvotes: 0

Views: 868

Answers (1)

badger
badger

Reputation: 3246

I had similar usecase and I solved the problem by adding a token to one mapper in order to understand this record is from which file(fileA or fileB) in reducer and then separate them.

suppose fileA is like:

A B C
C D D
A D D
A X Y

and fileB is like:

A ALICE
C BOB
A ALICE
A BOB

I write mappers like this (look I added a dollar sign at the start of each value I use this dollar sign in reducer):

public static class FileAMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String specifier = "$";
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] parts = line.split(" ");
                String k = parts[0];
                String v = specifier + parts[1] + " " + parts[2];
                context.write(new Text(k), new Text(v));
            }
        }

next mapper:

public static class FileBMapper extends Mapper<LongWritable, Text, Text, Text> {


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] parts = line.split(" ");
        String k = parts[0];
        String v = parts[1];
        context.write(new Text(k), new Text(v));
    }
}

now the reducer: I defined two arraylists in order to separate each value based on dollar sign I used in mapper

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> left = new ArrayList<>();
            List<String> right = new ArrayList<>();
            values.forEach((e) -> {
                String temp = e.toString();
                if (temp.startsWith("$")) {
                    left.add(temp.substring(1));
                } else {
                    right.add(temp);
                }
            });


            left.forEach(l -> 
                    right.forEach(r -> 
                            System.out.println(String.format("%s %s %s", key.toString(), l, r))));
        }
    }

result:

A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB

driver:

Job job = new Job(new Configuration());
job.setJarByClass(Main.class);

Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath =   new Path("output");

MultipleInputs.addInputPath(job, fileA, TextInputFormat.class,  FileAMapper.class);
MultipleInputs.addInputPath(job, fileB, TextInputFormat.class,  FileBMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);

Upvotes: 1

Related Questions