Reputation: 2571
I have an input of records like this, a|1|Y, b|0|N, c|1|N, d|2|Y, e|1|Y
Now, in mapper, i has to check the value of third column. If it is 'Y' then that record has to write directly to output file without moving that record to reducer or else i.e, 'N' value records has to move to reducer for further processing..
So, a|1|Y, d|2|Y, e|1|Y should not go to reducer but b|0|N, c|1|N should go to reducer and then to output file.
How can i do this??
Upvotes: 2
Views: 133
Reputation: 34184
See if this works,
public class Xxxx {
public static class MyMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Random r = new Random();
FileSplit split = (FileSplit)context.getInputSplit();
String fileName = split.getPath().getName();
FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));
String parts[];
String line = value.toString();
String[] splits = line.split(",");
for(String s : splits) {
parts = s.split("\\|");
if(parts[2].equals("Y")) {
out.writeBytes(line);
}else {
context.write(key, value);
}
}
out.close();
fs.close();
}
}
public static class MyReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for(Text t : values) {
context.write(key, t);
}
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job = new Job(conf, "Xxxx");
job.setJarByClass(Xxxx.class);
Path outPath = new Path("/output_path");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
FileInputFormat.addInputPath(job, new Path("/input.txt"));
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Upvotes: 1
Reputation: 13
In your map function, you will get input line by line. Split it according by using | as the delimiter. (by using the String.split()
method to be exact)
It will look like this
String[] line = value.toString().split('|');
Access the third element of this array by line[2]
Then, using a simple if else
statement, emit the output with N value for further processing.
Upvotes: -1
Reputation: 5063
What you can probably do is use MultipleOutputs - click here to separate out records of 'Y' and 'N' type to two different files from mappers.
Next, you run saparate jobs for the two newly generated 'Y' and 'N' type data sets. For 'Y' types set number of reducers to 0, so that, Reducers aren't use. And, for 'N' types do it the way you want using reducers.
Hope this helps.
Upvotes: 2