user3717936
user3717936

Reputation: 1

file comparison in hadoop using mapreduce

A .txt

col1       col2    col3    col4    col5
A           120     140     160     180
B           200     220     240     260
D           400     420     440     460

B.txt

col1      col2    col3    col4    col5
A          110     140     160     180
B          200     220     240     260
C          600     620     640     660

Output-File

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660

1) col1 and col2 are the primary keys in this any key is changed then we have display two records like

in A.txt contain 1st Records:- A          120      140     160     180
in B.txt contain 1st Records:- A          110      140     160     180 

in this col2 has changed so i have to display two records

2) if there is no change in the record on both files(I mean look the same) we have to display only one record

3) in both file display all other records

final output should look like this

Output-File

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660

Upvotes: 0

Views: 84

Answers (2)

Ronak Patel
Ronak Patel

Reputation: 3849

Here is mapreduce solution:
put 2 or more files in one directory (input - arg1), it will merge all file with one with matching all your requirements. it also matches col3 to end for non macthing rows for one key (col1+col2) see comments for more info...

public class FileCompare  extends Configured implements Tool{

    public static class FileComapreMapper extends Mapper<Object, Text, Text, Text> {
        int lineno=0;

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            try{
                lineno++;
                System.out.println(lineno + " -> " + value);
                //skip header - uncomment this line to include header in output
                if(lineno == 1) return; 

                String[] fields = value.toString().split("\\s+");//assuming input recs are whitespace seperated
                String col1_col2 = fields[0] + "," + fields[1]; //key
                String col3tolast="";
                for(int i=2; i < fields.length;i++)
                    col3tolast+=fields[i] + ","; //values

               col3tolast=col3tolast.substring(0, col3tolast.length()-1); //remove last char(',')
               context.write(new Text(col1_col2), new Text(col3tolast)); //send key, value pairs to reducer
            }catch(Exception e){
                System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
            }
        }   
    }

    public  static class FileComapreReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {
            //Get unique col3 to last value
            Set<Text> uniqueCol3tolast = new HashSet<Text>();
            for(Text record : values)
                uniqueCol3tolast.add(record);
            //write key + value
            for(Text col3tolast:uniqueCol3tolast) //outputing tab delimited recs
                context.write(new Text(key.toString().replaceAll(",", "\t")), 
                        new Text(col3tolast.toString().replaceAll(",", "\t")));     
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FileCompare(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: <in> <out>");
            System.exit(2);
        }
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "merge-two-files");
        job.setJarByClass(FileCompare.class);
        job.setMapperClass(FileComapreMapper.class);
        job.setReducerClass(FileComapreReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        return job.waitForCompletion(true) ? 0 : 1;
    } 
}

enter image description here

Upvotes: 1

nobody
nobody

Reputation: 11090

Use PIG.Load both the files,union the records and then distinct it.

A = LOAD 'A.txt' USING PigStorage('\t');
B = LOAD 'B.txt' USING PigStorage('\t');
C = UNION A,B;
D = DISTINCT C;
DUMP D;

Upvotes: 3

Related Questions