Mozart
Mozart

Reputation: 205

Map/Reduce:How to output Hashmap after completion?

I want to implement the algorithm of DPC(Clustering by fast search and find of density peaks). It is a big job so I decided to start from Calculation of Rho.

Here the Map:

 public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] lineSplit = line.split(" ");
            if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
                IntWritable one = new IntWritable(
                        Integer.parseInt(lineSplit[0]));
                IntWritable two = new IntWritable(
                        Integer.parseInt(lineSplit[1]));
                context.write(one, two);
            }
        }

Here the Reducer:

public void reduce(IntWritable key, IntWritable values, Context context)
                throws IOException, InterruptedException {
            int[] indexs = new int[2];
            indexs[0] = Integer.parseInt(key.toString());
            indexs[1] = Integer.parseInt(values.toString());
            for (int i = 0; i < indexs.length; i++) {
                densityCountMap.put(indexs[i],
                        densityCountMap.get(indexs[i]) + 1);
            }
        }

Problem

densityCountMap is a hash map, which can be right only after the completion. How to output densityCountMap? in what way?

---------Solution---------

Thanks to mbaxi, you really inspired me for you mentioned that the reduce definition is incorrect and densityCountMap is not necessary.

I should have made it more clear that the goal is that both lineSplit[0] and lineSplit[1] be increased if lineSplit[2] is below an certain threshold value. Actually there is no need to override cleanup.

Mapper:

public static class TokenizerMapper extends
        Mapper<LongWritable, Text, IntWritable, IntWritable> {
    private final static IntWritable count = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] lineSplit = line.split(" ");
        if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
            IntWritable one = new IntWritable(
                    Integer.parseInt(lineSplit[0]));
            IntWritable two = new IntWritable(
                    Integer.parseInt(lineSplit[1]));
            context.write(one, count);// Both should be increased 
            context.write(two, count);// both as key
        }
    }
}

Reducer:

public static class IntSumReducer extends
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);//densityCountMap is redundant if having known better the structure of Map/reduce
            context.write(key, result);//it equals to output densityCountMap
        }
    }

Thanks again, you brought more than just a help but an inspiration.

Upvotes: 1

Views: 2507

Answers (1)

mbaxi
mbaxi

Reputation: 1301

You can override the cleanup(Context context) method, keep populating your densityCountMap in the reduce() method and flush/write the contents to disk in cleanup(Context context) method.

cleanup() is invoked after all rows are processed.

---Edit as requested in comments section---

If you are using Eclipse editor, right click on Reducer class that you are extending, and click on Source->Override/Implement Methods, else you can lookup the javadocs.

private static class RhoCalculationReducer extends Reducer<Text,Text,Text,Text> {
}

There you will see a list of following methods [Please note the input parameters/datatypes may change depending on your class definition] -

cleanup(Context)
reduce(Text, Iterable<Text>, Context)
run(Context)
setup(Context)

Your reduce() or map() function are actually overriden implementations, where you provide your own processing logic. setup() and cleanup() can be considered similar to constructor or destructor respectively for the map or reduce task. setup() is called prior to start of map of reduce task and cleanup() is called at the end of task.

I also see your reduce definition is incorrect, instead of "IntWritable values" it should be "Iterable values", For a reducer it is ensured that values for a single key are processed by single reducer, that is why the signature takes a key and a list of iterable values. Probably you also want to aggregate records from single key together and may not need an additional densityCountMap as reducer already takes care of pulling all values for given key in one go.

Upvotes: 3

Related Questions