zelda26
zelda26

Reputation: 489

Java Hadoop MapReduce Chaining Job

I have some code that correctly selects the source and the highest weight. I can't seem to pull the target column in as well. could someone point me in the right direction? I've never used java before. I think the reducer function needs to return a tuple. therefore does the variable targets in the mapper function need to have this tuple?

Desired output: each line contains a node ID, followed by a tab (\t), and the expected “tgt ,weight” tuple. The tuple is the tgt with the highest weight. In the event of a tie, return the tgt with the lowest number.

INPUT

src        tgt        weight

1        110        3

1        200        1

20        150        30

10        110        10

11        130        15

11        200        67

1        70        3

EXPECTED OUTPUT

1        70,3

20        150,30

10        110,10

11        200,67

CURRENT OUTPUT (need to add in the tgt column as tuple)

1        3

20        30

10        10

11        67
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Q1 {


  public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {

      private Text target = new Text();
      public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                target.set(edge[0]);
                context.write(target, new IntWritable(Integer.parseInt(edge[2])));
            }
        }

    }

  public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable totalCount = new IntWritable();  
      public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{

            int max = 0;

            for (IntWritable target : targets)  {
                if(target.get() > max || max ==0) {
                    max = target.get();
                }
            }

            totalCount.set(max);

            context.write(key, totalCount);


        }
    }




    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Upvotes: 1

Views: 548

Answers (1)

Ambrish
Ambrish

Reputation: 3677

You are interested in custom output. To achieve that, try implementing custom WritableComparable. You may have to update your logic to make it work according to your need.

Something like:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class MyWritable implements WritableComparable<MyWritable> {
    private IntWritable tgt;
    private IntWritable weight;

    public MyWritable() {
        set(new IntWritable(), new IntWritable());
    }

    public MyWritable(int tgt, int weight) {
        set(new IntWritable(tgt), new IntWritable(weight));
    }

    public MyWritable(IntWritable tgt, IntWritable weight) {
        set(tgt, weight);
    }

    public IntWritable getTgt() {
        return tgt;
    }

    public IntWritable getWeight() {
        return weight;
    }

    public void set(IntWritable tgt, IntWritable weight) {
        this.tgt = tgt;
        this.weight = weight;
    }

    @Override
    public int compareTo(MyWritable o) {
        int cmp = tgt.compareTo(o.tgt);
        if (cmp == 0) {
            return weight.compareTo(o.weight);
        }
        return cmp;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        tgt.write(dataOutput);
        weight.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        tgt.readFields(dataInput);
        weight.readFields(dataInput);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        MyWritable that = (MyWritable) o;
        return Objects.equals(tgt, that.tgt) &&
                Objects.equals(weight, that.weight);
    }

    @Override
    public int hashCode() {
        return Objects.hash(tgt, weight);
    }
}

And update your code to use this as value in Mapper & Reducer. Like:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;


public class Q1 {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MyWritable.class);
        job.setMapOutputValueClass(MyWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TargetMapper extends Mapper<Object, Text, Text, MyWritable> {
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                Text target = new Text();
                target.set(edge[0]);
                int tgt = Integer.parseInt(edge[1]);
                int weight = Integer.parseInt(edge[2]);
                context.write(target, new MyWritable(tgt, weight));
            }
        }

    }

    public static class EmailsReducer extends Reducer<Text, MyWritable, Text, MyWritable> {
        private MyWritable res = new MyWritable();

        public void reduce(Text key, Iterable<MyWritable> targets, Context context) throws IOException, InterruptedException {
            int maxWeight = Integer.MIN_VALUE;
            int maxTgt = Integer.MIN_VALUE;

            for (MyWritable target : targets) {
                if (target.getWeight().get() > maxWeight) {
                    maxWeight = target.getWeight().get();
                    maxTgt = target.getTgt().get();
                }
            }

            res.set(new IntWritable(maxTgt), new IntWritable(maxWeight));

            context.write(key, res);
        }
    }
}

Upvotes: 1

Related Questions