mikmak
mikmak

Reputation: 13

Hadoop MapReduce distinct pattern with custom Writable produces duplicate keys

I'm trying to implement the distinct pattern:

map(key, record):
  emit record,null
reduce(key, records):
  emit key

My key is a complex, custom Writable. If I emit in the reduce the key and its hashcode:

context.write(key, new IntWtitable(key.hashCode());

I receive the following output:

key1 -1808937256
key2 -768063202
key3 906064410
key2 -768063202
key3 906064410

In theory, the output should only contain key1, key2, and key3 since I'm using the HashPartitioner: keys with equal hash code are combined into the same partition. This is clearly not the case here.

If I'm converting my complex Writable into a Text object (and adapt Mapper/Reducer classes accordingly), and emit in the Mapper:

 context.write(new Text(key.toString()), NullWritable.get());

... the output is as expected:

key1 1013632023
key2 762485389
key3 -1193948769

Ok, and here is a minimal working example that illustrates the behavior.

Input:

A A A A A
B B B B B
C C C C C
A A A A A
B B B B B

The MapReduce job:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class DistinctPattern extends Configured implements Tool {
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> {


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        ComplexObject o = new ComplexObject(value.toString());
        context.write(o, NullWritable.get());
    }
}

public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> {


    public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException {

        context.write(key, new IntWritable(key.hashCode()));
    }
}

public static class MyArrayWritable extends ArrayWritable {

    public MyArrayWritable(Writable[] values) {
        super(DatumObject.class, values);
    }

    public MyArrayWritable() {
        super(DatumObject.class);
    }

    @Override
    public String toString() {
        return Arrays.toString(get());
    }

}

public static class DatumObject implements Writable {
    private String datum;

    public DatumObject() {}

    public DatumObject(String d) {
        datum = d;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        datum = in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(datum);    
    }

    @Override
    public String toString() {
        return datum;
    }

    @Override
    public int hashCode() {
        return 31 * datum.hashCode();
    }

}

public static class ComplexObject implements WritableComparable<ComplexObject> {
    private List<DatumObject> data = new ArrayList<>();

    public ComplexObject() {}

    public ComplexObject(String d) {
        String[] elements = d.split(" ");
        for(int i = 0; i < elements.length; i++)
            data.add(new DatumObject(elements[i]));
    }

    public int size() {
        return data.size();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        data.clear();
        MyArrayWritable m = new MyArrayWritable();
        m.readFields(in);
        Writable[] w = m.get();
        for(int i = 0; i < w.length; i++)
            data.add((DatumObject) w[i]);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()]));
        m.write(out);
    }

    @Override
    public int compareTo(ComplexObject o) {
        if(this.equals(o))
            return 0;

        if(o.size() < this.size())
            return -1;

        return 1;
    }

    @Override
    public boolean equals(Object obj) {
        if(!(obj instanceof ComplexObject))
            return false;

        ComplexObject other = (ComplexObject) obj;
        return other.data.equals(data);
    }

    @Override
    public int hashCode() {
        return 31 * data.hashCode();
    }

    @Override
    public String toString() {
        StringBuilder s= new StringBuilder();
        data.forEach( entry -> {
            s.append(entry); 
            s.append(" ");
        });

        return s.toString();
    }

}

@Override
public int run(String[] args) throws Exception {
    Job job = Job.getInstance();
    job.setJar("distinct.jar");
    job.setJarByClass(DistinctPattern.class);
    job.setMapperClass(DistinctMapper.class);
    job.setReducerClass(DistinctReducer.class);
    job.setMapOutputKeyClass(ComplexObject.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(ComplexObject.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {       
    int exitCode = ToolRunner.run(new DistinctPattern(), args);
    System.exit(exitCode);
}
}

Expected output:

A A A A A       368623362
B B B B B       1285710467
C C C C C       -2092169724

Actual output:

A A A A A       368623362
B B B B B       1285710467
C C C C C       -2092169724
A A A A A       368623362
B B B B B       1285710467

What am I missing?

PS: Hadoop 2.7.3

Upvotes: 0

Views: 416

Answers (1)

mikmak
mikmak

Reputation: 13

Ok, found the mistake(s) in my code. First, the minimal working example lacks an implementation of the equals method in class DatumObject:

@Override
public boolean equals(Object obj) {
    if(obj == null)
        return false;

    if(!(obj instanceof DatumObject))
        return false;

    DatumObject other = (DatumObject) obj;
        return other.datum.equals(datum);
}

Second, one aspect I could not reproduce in the minimal working example but which appears in my actual code, is that not all of my key classes did implement the WritableComparable interface. As a result, I suspect that the shuffle phase did not sort the keys as expected. Once the compareTo methods were correctly implemented in all classes composing my key value (see class diagram here), the distinct pattern worked as expected.

Upvotes: 0

Related Questions