Gurupraveen
Gurupraveen

Reputation: 181

Spark master does not invoke Custom InputFormat

Am trying to explore Apache Spark, as part of that I wanted to customize the InputFormat. In my case i wanted to read xml file and convert every occurence of <text> to new record.

I did write customized TextInputFormat (XMLRecordInputFormat.java) that returns customized **XMLRecordReader extends org.apache.hadoop.mapreduce.RecordReader**

But i dont understand why the Spark master does not invoke customized input format (XMLRecordInputFormat.class)? For some reason it continues to behave like normal Line splitter.

Following is the code:

import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader{

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
        .setMaster("local")
        .setAppName("CustomizedXMLReader")
        .set("spark.executor.memory", "512m").set("record.delimiter.regex", "</bermudaview>");

        JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
        jobConf.setInputFormat(XMLRecordInputFormat.class);
        FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaPairRDD<LongWritable, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordInputFormat.class, LongWritable.class, Text.class);



        Function<Tuple2<LongWritable, Text>, XMLRecord> keyData =
              new Function<Tuple2<LongWritable, Text>, XMLRecord>() {
                     @Override
                    public XMLRecord call(Tuple2<LongWritable, Text> arg0)
                            throws Exception {
                        // TODO Auto-generated method stub
                         System.out.println(arg0.toString());
                         XMLRecord record = new XMLRecord();
                         record.setPos(Long.getLong(arg0._1.toString()));
                         record.setXml(arg0._2.toString());                      
                         return record;                         
                    }
                };

        JavaRDD<XMLRecord> words = lines.map(keyData);      

        List<XMLRecord>  tupleList = words.collect();    

        Iterator<XMLRecord> itr = tupleList.iterator();

        while(itr.hasNext()){
            XMLRecord t = itr.next();
            System.out.println(t.getXml());
            System.out.println(t.getPos());
        }
    }
}

//following custom InputFormat implementation

public class XMLRecordInputFormat extends TextInputFormat{

    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException {
        // TODO Auto-generated method stub
        XMLRecordReader r = new XMLRecordReader();

        return r;
    }


}

Upvotes: 2

Views: 800

Answers (1)

Gurupraveen
Gurupraveen

Reputation: 181

I think i figured the way to do it.

I was getting confused with API's and between org.apache.hadoop.mapred.RecordReader (Interface) and org.apache.hadoop.mapreduce.RecordReader (class). And also the InputFormat to use.

Looks like FileInputFormat and org.apache.hadoop.mapred.RecordReader does go hand in hand. Please find the complete code for parsing XML into JavaRDD.

In this example am looking to parse and extract XML tag ....

Main class

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader implements Serializable{

    private static final long serialVersionUID = 1L;


    public static void main(String[] args) {        
        CustomizedXMLReader reader = new CustomizedXMLReader();     
        reader.readUsingFileInputFormat(args);
    }



    /**
     * Doing all reading using org.apache.hadoop.mapred.RecordReader interface. This is doing good.
     * @param args
     */
        public void readUsingFileInputFormat(String[] args){
            SparkConf sparkConf = new SparkConf()
            .setMaster("local")
            .setAppName("CustomizedXMLReader")
            .set("spark.executor.memory", "512m").set("record.delimiter.regex", "</name>");

            JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
            jobConf.setInputFormat(XMLRecordFileInputFormat.class);
            FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
            JavaPairRDD<Text, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordFileInputFormat.class, Text.class, Text.class);



            Function<Tuple2<Text, Text>, XMLRecord> keyData =
                  new Function<Tuple2<Text, Text>, XMLRecord>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public XMLRecord call(Tuple2<Text, Text> arg0)
                                throws Exception {
                             System.out.println(arg0.toString());
                             XMLRecord record = new XMLRecord();
                             record.setPos(arg0._1.toString());
                             record.setXml(arg0._2.toString());                      
                             return record;                         
                        }
                    };

            JavaRDD<XMLRecord> words = lines.map(keyData);      

            List<XMLRecord>  tupleList = words.collect();    

            Iterator<XMLRecord> itr = tupleList.iterator();

            while(itr.hasNext()){
                XMLRecord t = itr.next();
                System.out.println(t.getXml());
                System.out.println(t.getPos());


            }
        }
        }

RecordReader

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.StreamXmlRecordReader;

public class XMLInterfaceRecordReader implements org.apache.hadoop.mapred.RecordReader<Text,Text>{

     private StreamXmlRecordReader in;
     private String delimiterRegex;
      private long start;
      private long pos;
      private long end;
      private static Long keyInt = 0L;


      public XMLInterfaceRecordReader(InputSplit split, JobConf arg1, Reporter rep) throws IOException {
          super();
          FileSplit fSplit = (FileSplit) split;

          this.delimiterRegex = "</name>";

            start = fSplit.getStart();
            end = start + fSplit.getLength();
            arg1.set("stream.recordreader.begin", "<name>");
            arg1.set("stream.recordreader.end", delimiterRegex);


            final Path file = fSplit.getPath();
            FileSystem fs = file.getFileSystem(arg1);
            FSDataInputStream fileIn = fs.open(fSplit.getPath());

            boolean skipFirstLine = false;
            if (start != 0) {
                skipFirstLine = true;           
                --start;
                fileIn.seek(start);
            }

            in = new StreamXmlRecordReader(fileIn, fSplit,rep, arg1,fs);

            this.pos = start;
    }


    @Override
    public void close() throws IOException {
        if (in != null) {
            in.close();
        }       
    }

    @Override
    public Text createKey() {
        return new Text();
    }

    @Override
    public Text createValue() {
        return new Text();
    }

    @Override
    public long getPos() throws IOException {
        return pos;
    }

    @Override
    public float getProgress() throws IOException {
        if (start == end) {
            return  (long) 0.0f;
        }
        else {
            return  (long) Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }

    @Override
    public boolean next(Text Key, Text Value) throws IOException {
        in.seekNextRecordBoundary();
        Text key = new Text();
        Text val = new Text();
        in.next(key, val);

        if(key.toString() != null && key.toString().length() > 0){              
                System.out.println(key.toString());
                System.out.println(val.toString());
                start += in.getPos();
                Key.set(new LongWritable(++keyInt).toString());
                Value.set(key.toString());
                return true;
        }else
                return false;
    }

}

File Input format

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class XMLRecordFileInputFormat extends FileInputFormat<Text, Text>{

    XMLInterfaceRecordReader reader = null;

    public XMLRecordFileInputFormat(){

    }

    @Override
    public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
            JobConf arg1, Reporter arg2) throws IOException {
        if(reader != null)
                return reader;
        else
              return new XMLInterfaceRecordReader(arg0,arg1,arg2);
    }

}

Upvotes: 1

Related Questions