Peter2711
Peter2711

Reputation: 899

select distinct query in java map reduce

10001|76884|1995-06-24|1996-06-23
10001|76884|1995-06-24|1996-06-23
10001|75286|1993-06-24|1994-06-24

my target is to remove the dup values and the output be like

10001|76884|1995-06-24|1996-06-23
10001|75286|1993-06-24|1994-06-24

I wrote a code as below

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class charterSelDistRec {

        public static class Map extends Mapper <LongWritable, Text, Text, Text> {
            private String tableKey,tableValue;

            public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {

                    String line = value.toString();
                    String splitarray[] = line.split("\\|",2);
                    tableKey = splitarray[0].trim();
                    tableValue = splitarray[1].trim();

                    context.write(new Text(tableKey), new Text(tableValue));     
                }
        }               

        public static class Reduce extends Reducer <Text, Text, Text, Text> {                         
            public void reduce(Text key, Iterator<Text> values, Context context) 
                      throws IOException, InterruptedException {
                    String ColumnDelim="";
                    String tableOutValue=ColumnDelim+values;
                    context.write(new Text(key), new Text(tableOutValue));

                }
        }

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = new Job(conf,"charterSelDistRec");
                job.getConfiguration().set("mapreduce.job.queuename", "root.Dev");
                job.getConfiguration().set("mapreduce.output.textoutputformat.separator","|");
                job.setJobName("work_charter_stb.ext_chtr_vod_fyi_mapped");
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(Map.class);

                job.setReducerClass(Reduce.class);

                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);


                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                job.setJarByClass(charterSelDistRec.class); 
                job.waitForCompletion(true);
          }
      }

but still the output file has dups. Please do let me know where I am wrong.

Upvotes: 1

Views: 3998

Answers (3)

Turbero
Turbero

Reputation: 126

Try this. The idea is only emitting the first value of the Iterable since they are all the same and you want to remove dup values.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class charterSelDistRec {

    public  static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable ignore, Text value, Context context)
            throws IOException, InterruptedException {
            context.write(value, value);
        }  
    }

    public static class MyReducer extends Reducer<Text, Text, Text, NullWritable> {    
      @Override
      public void reduce(Text key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException {
          for (Text value : values){
              context.write(value, NullWritable.get());
              break;
          }
      }
    }       

  /* This is your main. Changed the outputValueClass method only */
  public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = new Job(conf,"charterSelDistRec");
      job.getConfiguration().set("mapreduce.job.queuename", "root.Dev");
      job.getConfiguration().set("mapreduce.output.textoutputformat.separator","|");
      job.setJobName("work_charter_stb.ext_chtr_vod_fyi_mapped");
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      job.setMapperClass(Map.class);

      job.setReducerClass(Reduce.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);


      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      job.setJarByClass(charterSelDistRec.class); 
      job.waitForCompletion(true);
   }
}

Upvotes: 0

K246
K246

Reputation: 1107

It need not be so complicated. All you have to do is:

  1. in mapper, emit each line as key and any value

  2. in reducer, just emit the keys and ignore values.

Sharing code:

Here is the input:

10001|76884|1995-06-24|1996-06-23
10001|76884|1995-06-24|1996-06-23
10001|75286|1993-06-24|1994-06-24

Here is the code:

public class StackRemoveDup {

    public  static class MyMapper extends Mapper<LongWritable,Text, Text, NullWritable> {

        @Override
        public void map(LongWritable ignore, Text value, Context context)
            throws java.io.IOException, InterruptedException {
            context.write(value,NullWritable.get());
        }  
    }

    public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

      @Override
      public void reduce(Text key, Iterable<NullWritable> values, Context context)
          throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
      }
    }       

  public static void main(String[] args) 
                  throws IOException, ClassNotFoundException, InterruptedException {

    Job job = new Job();
    job.setJarByClass(StackRemoveDup.class);
    job.setJobName("StackRemoveDup");

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    job.waitForCompletion(true);
  }
}

Here is the output:

10001|75286|1993-06-24|1994-06-24
10001|76884|1995-06-24|1996-06-23

Upvotes: 4

Ramzy
Ramzy

Reputation: 7138

You have two records in first row, and one record in sec row. And once reading in done in map, you are splitting based on |, but your rows(entities) are separated by space, as I can see. Just verify if that is how the actual data would be. Traditional format would be like, each row(entity) in a single row, and map reduce filters the unique keys after map phase. Once your input is in that format, all you get in reducer are unique keys.

If your input is anything different(like above - 2 records in same line), you need to consider different input format, or handle he logic differently. Understanding how map reduce works and the formats it takes, will help you more. Happy learning

Upvotes: 0

Related Questions