Reputation: 181
Am trying to explore Apache Spark, as part of that I wanted to customize the InputFormat. In my case i wanted to read xml
file and convert every occurence of <text>
to new record.
I did write customized TextInputFormat
(XMLRecordInputFormat.java) that returns customized **XMLRecordReader extends org.apache.hadoop.mapreduce.RecordReader**
But i dont understand why the Spark master does not invoke customized input format (XMLRecordInputFormat.class)
? For some reason it continues to behave like normal Line splitter.
Following is the code:
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class CustomizedXMLReader{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</bermudaview>");
JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordInputFormat.class, LongWritable.class, Text.class);
Function<Tuple2<LongWritable, Text>, XMLRecord> keyData =
new Function<Tuple2<LongWritable, Text>, XMLRecord>() {
@Override
public XMLRecord call(Tuple2<LongWritable, Text> arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(Long.getLong(arg0._1.toString()));
record.setXml(arg0._2.toString());
return record;
}
};
JavaRDD<XMLRecord> words = lines.map(keyData);
List<XMLRecord> tupleList = words.collect();
Iterator<XMLRecord> itr = tupleList.iterator();
while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}
//following custom InputFormat implementation
public class XMLRecordInputFormat extends TextInputFormat{
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException {
// TODO Auto-generated method stub
XMLRecordReader r = new XMLRecordReader();
return r;
}
}
Upvotes: 2
Views: 800
Reputation: 181
I think i figured the way to do it.
I was getting confused with API's and between org.apache.hadoop.mapred.RecordReader (Interface) and org.apache.hadoop.mapreduce.RecordReader (class). And also the InputFormat to use.
Looks like FileInputFormat and org.apache.hadoop.mapred.RecordReader does go hand in hand. Please find the complete code for parsing XML into JavaRDD.
In this example am looking to parse and extract XML tag ....
Main class
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class CustomizedXMLReader implements Serializable{
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
CustomizedXMLReader reader = new CustomizedXMLReader();
reader.readUsingFileInputFormat(args);
}
/**
* Doing all reading using org.apache.hadoop.mapred.RecordReader interface. This is doing good.
* @param args
*/
public void readUsingFileInputFormat(String[] args){
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</name>");
JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<Text, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordFileInputFormat.class, Text.class, Text.class);
Function<Tuple2<Text, Text>, XMLRecord> keyData =
new Function<Tuple2<Text, Text>, XMLRecord>() {
private static final long serialVersionUID = 1L;
@Override
public XMLRecord call(Tuple2<Text, Text> arg0)
throws Exception {
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(arg0._1.toString());
record.setXml(arg0._2.toString());
return record;
}
};
JavaRDD<XMLRecord> words = lines.map(keyData);
List<XMLRecord> tupleList = words.collect();
Iterator<XMLRecord> itr = tupleList.iterator();
while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}
RecordReader
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.StreamXmlRecordReader;
public class XMLInterfaceRecordReader implements org.apache.hadoop.mapred.RecordReader<Text,Text>{
private StreamXmlRecordReader in;
private String delimiterRegex;
private long start;
private long pos;
private long end;
private static Long keyInt = 0L;
public XMLInterfaceRecordReader(InputSplit split, JobConf arg1, Reporter rep) throws IOException {
super();
FileSplit fSplit = (FileSplit) split;
this.delimiterRegex = "</name>";
start = fSplit.getStart();
end = start + fSplit.getLength();
arg1.set("stream.recordreader.begin", "<name>");
arg1.set("stream.recordreader.end", delimiterRegex);
final Path file = fSplit.getPath();
FileSystem fs = file.getFileSystem(arg1);
FSDataInputStream fileIn = fs.open(fSplit.getPath());
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new StreamXmlRecordReader(fileIn, fSplit,rep, arg1,fs);
this.pos = start;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
@Override
public Text createKey() {
return new Text();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
if (start == end) {
return (long) 0.0f;
}
else {
return (long) Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean next(Text Key, Text Value) throws IOException {
in.seekNextRecordBoundary();
Text key = new Text();
Text val = new Text();
in.next(key, val);
if(key.toString() != null && key.toString().length() > 0){
System.out.println(key.toString());
System.out.println(val.toString());
start += in.getPos();
Key.set(new LongWritable(++keyInt).toString());
Value.set(key.toString());
return true;
}else
return false;
}
}
File Input format
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class XMLRecordFileInputFormat extends FileInputFormat<Text, Text>{
XMLInterfaceRecordReader reader = null;
public XMLRecordFileInputFormat(){
}
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
JobConf arg1, Reporter arg2) throws IOException {
if(reader != null)
return reader;
else
return new XMLInterfaceRecordReader(arg0,arg1,arg2);
}
}
Upvotes: 1