Reputation: 1
col1 col2 col3 col4 col5
A 120 140 160 180
B 200 220 240 260
D 400 420 440 460
col1 col2 col3 col4 col5
A 110 140 160 180
B 200 220 240 260
C 600 620 640 660
A 120 140 160 180
A 110 140 160 180
B 200 220 240 260
D 400 420 440 460
C 600 620 640 660
1) col1 and col2 are the primary keys in this any key is changed then we have display two records like
in A.txt contain 1st Records:- A 120 140 160 180
in B.txt contain 1st Records:- A 110 140 160 180
in this col2 has changed so i have to display two records
2) if there is no change in the record on both files(I mean look the same) we have to display only one record
3) in both file display all other records
final output should look like this
A 120 140 160 180
A 110 140 160 180
B 200 220 240 260
D 400 420 440 460
C 600 620 640 660
Upvotes: 0
Views: 84
Reputation: 3849
Here is mapreduce
solution:
put 2 or more files in one directory (input - arg1
), it will merge all file with one with matching all your requirements. it also matches col3 to end for non macthing rows for one key (col1+col2) see comments for more info...
public class FileCompare extends Configured implements Tool{
public static class FileComapreMapper extends Mapper<Object, Text, Text, Text> {
int lineno=0;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
try{
lineno++;
System.out.println(lineno + " -> " + value);
//skip header - uncomment this line to include header in output
if(lineno == 1) return;
String[] fields = value.toString().split("\\s+");//assuming input recs are whitespace seperated
String col1_col2 = fields[0] + "," + fields[1]; //key
String col3tolast="";
for(int i=2; i < fields.length;i++)
col3tolast+=fields[i] + ","; //values
col3tolast=col3tolast.substring(0, col3tolast.length()-1); //remove last char(',')
context.write(new Text(col1_col2), new Text(col3tolast)); //send key, value pairs to reducer
}catch(Exception e){
System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
}
}
}
public static class FileComapreReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//Get unique col3 to last value
Set<Text> uniqueCol3tolast = new HashSet<Text>();
for(Text record : values)
uniqueCol3tolast.add(record);
//write key + value
for(Text col3tolast:uniqueCol3tolast) //outputing tab delimited recs
context.write(new Text(key.toString().replaceAll(",", "\t")),
new Text(col3tolast.toString().replaceAll(",", "\t")));
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FileCompare(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "merge-two-files");
job.setJarByClass(FileCompare.class);
job.setMapperClass(FileComapreMapper.class);
job.setReducerClass(FileComapreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = null;
Path dstFilePath = new Path(args[1]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
return job.waitForCompletion(true) ? 0 : 1;
}
}
Upvotes: 1
Reputation: 11090
Use PIG.Load both the files,union the records and then distinct it.
A = LOAD 'A.txt' USING PigStorage('\t');
B = LOAD 'B.txt' USING PigStorage('\t');
C = UNION A,B;
D = DISTINCT C;
DUMP D;
Upvotes: 3