Reputation: 55
I have several (title , text )
ordered pairs obtained as an output from a MapReduce application in Hadoop using Java.
Now I would like to implement Word Count on the text field of these ordered pairs.
So my final output should look like :
(title-a , word-a-1 , count-a-1 , word-a-2 , count-a-2 ....)
(title-b , word-b-1, count-b-1 , word-b-2 , count-b-2 ....)
.
.
.
.
(title-x , word-x-1, count-x-1 , word-x-2 , count-x-2 ....)
To summarize , I want to implement wordcount separately on the output records from first mapreduce. Can someone suggest me a good way to do it or how I can chain a second map reduce job to create the above output or format it better ?
The following is the code , borrowed it from github and made some changes
package com.org;
import javax.xml.stream.XMLStreamConstants;//XMLInputFactory;
import java.io.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.xml.stream.*;
public class XmlParser11
{
public static class XmlInputFormat1 extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}
/**
* XMLRecordReader class to read through a given xml document to output
* xml blocks as records as specified by the start tag and end tag
*
*/
// @Override
public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;
// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0,
buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
}
public static class Map extends Mapper<LongWritable, Text,Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws
IOException, InterruptedException {
String document = value.toString();
System.out.println("'" + document + "'");
try {
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new
ByteArrayInputStream(document.getBytes()));
String propertyName = "";
String propertyValue = "";
String currentElement = "";
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
currentElement = reader.getLocalName();
break;
case XMLStreamConstants.CHARACTERS: //CHARACTERS:
if (currentElement.equalsIgnoreCase("title")) {
propertyName += reader.getText();
//System.out.println(propertyName);
} else if (currentElement.equalsIgnoreCase("text")) {
propertyValue += reader.getText();
//System.out.println(propertyValue);
}
break;
}
}
reader.close();
context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));
}
catch(Exception e){
throw new IOException(e);
}
}
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {
@Override
protected void setup(
Context context)
throws IOException, InterruptedException {
context.write(new Text("<Start>"), null);
}
@Override
protected void cleanup(
Context context)
throws IOException, InterruptedException {
context.write(new Text("</Start>"), null);
}
private Text outputKey = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
for (Text value : values) {
outputKey.set(constructPropertyXml(key, value));
context.write(outputKey, null);
}
}
public static String constructPropertyXml(Text name, Text value) {
StringBuilder sb = new StringBuilder();
sb.append("<property><name>").append(name)
.append("</name><value>").append(value)
.append("</value></property>");
return sb.toString();
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");
Job job = new Job(conf);
job.setJarByClass(XmlParser11.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(XmlParser11.Map.class);
job.setReducerClass(XmlParser11.Reduce.class);
job.setInputFormatClass(XmlInputFormat1.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
The wordcount code we find online does the word count of all files and gives the output . I want to do the wordcount for each of text fields separately. The above mapper is used to pull title and text from an XML document . Is there any way I can do the wordcount in the same mapper. If I do that , my next doubt is how do I pass it along with the already existing key value pairs (title,text) to the reducer. Sorry , I am not able to phrase my question properly but I guess the reader must have got some idea
Upvotes: 0
Views: 2401
Reputation: 376
i suggested you can go with regular expression
and perform mapping and grouping. in hadoop example jar file provide Grep class using this you can perform mapping of your hdfs data using regular expression. and group your maped data.
Upvotes: 0
Reputation: 34184
I'm not sure if I have understood it properly. So I have many questions along with my answer.
First of all whoever has written this code is probably trying to show how to write a custom InputFormat to process xml data using MR. I don't know how it is related to your problem.
To summarize , I want to implement wordcount separately on the output records from first mapreduce. Can someone suggest me a good way to do it
Read the output file generated by first MR and do it.
or how I can chain a second map reduce job to create the above output or format it better ?
You can definitely chain jobs together in this fashion by writing multiple driver methods, one for each job. See this for more details and this for an example.
I want to do the wordcount for each of text fields separately.
What do you mean by separately?In the traditional wordcount program count of each word is calculated independently of the others.
Is there any way I can do the wordcount in the same mapper.
I hope you have understood the wordcount program properly. In the traditional wordcount program you read the input file, one line at a time, slit the line into words and then emit each word as the key with 1 as the value. All this happens inside the Mapper, which is essentially the same Mapper. And then the total count for each word is determined in the Reducer part of your job. If you wish to emit the words with their total counts from the mapper itself you have to read the whole file in the Mapper itself and do the counting. For that you need to set isSplittable in your InputFormat to false so that your input file is read as a whole and goes to just one Mapper.
When you emit something from Mapper and if it is not a Map only job, the output of your Mapper automatically goes to the Reducer. Do you need something else?
Upvotes: 0