Reputation: 663
I would like to combine the results of two separate mappers, and then execute the reducer on the combined results.
I have two files. The first file has the following columns: A, B, C. The second file: A, D.
Now, both mappers have the same signature: Mapper<LongWritable, Text, LongWritable, Text>
. The output of the first mapper is KEY: new LongWritable(A)
and VALUE: new Text(B, C)
if a specific condition is met. The output of the second KEY: new LongWritable(A)
and VALUE: new Text(D)
if another condition is met.
Now, when I output values from Iterable<Text>
in my reducer, I obtain either B+C or D.
Given that two sets have intersections, how can I obtain B, C, D for a given A in the reducer?
Upvotes: 0
Views: 868
Reputation: 3246
I had similar usecase and I solved the problem by adding a token to one mapper in order to understand this record is from which file(fileA or fileB) in reducer and then separate them.
suppose fileA
is like:
A B C
C D D
A D D
A X Y
and fileB
is like:
A ALICE
C BOB
A ALICE
A BOB
I write mappers like this (look I added a dollar sign at the start of each value I use this dollar sign in reducer):
public static class FileAMapper extends Mapper<LongWritable, Text, Text, Text> {
private String specifier = "$";
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = specifier + parts[1] + " " + parts[2];
context.write(new Text(k), new Text(v));
}
}
next mapper:
public static class FileBMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = parts[1];
context.write(new Text(k), new Text(v));
}
}
now the reducer: I defined two arraylists in order to separate each value based on dollar sign I used in mapper
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> left = new ArrayList<>();
List<String> right = new ArrayList<>();
values.forEach((e) -> {
String temp = e.toString();
if (temp.startsWith("$")) {
left.add(temp.substring(1));
} else {
right.add(temp);
}
});
left.forEach(l ->
right.forEach(r ->
System.out.println(String.format("%s %s %s", key.toString(), l, r))));
}
}
result:
A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB
driver:
Job job = new Job(new Configuration());
job.setJarByClass(Main.class);
Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath = new Path("output");
MultipleInputs.addInputPath(job, fileA, TextInputFormat.class, FileAMapper.class);
MultipleInputs.addInputPath(job, fileB, TextInputFormat.class, FileBMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);
Upvotes: 1