D. Müller
D. Müller

Reputation: 3426

Hadoop 2: Empty result when using custom InputFormat

I want to use a own FileInputFormat with a custom RecordReader to read csv data into <Long><String> pairs.

Therefore I created the class MyTextInputFormat:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class MyTextInputFormat extends FileInputFormat<Long, String> {

  @Override
  public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
      reporter.setStatus(input.toString());
      return new MyStringRecordReader(job, (FileSplit)input);
  }

  @Override
  protected boolean isSplitable(FileSystem fs, Path filename) {
    return super.isSplitable(fs, filename);
  }
}

and the class MyStringRecordReader:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class MyStringRecordReader implements RecordReader<Long, String> {

    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
        lineReader = new LineRecordReader(job, split);

        lineKey = lineReader.createKey();
        lineValue = lineReader.createValue();

        System.out.println("constructor called");
    }

    @Override
    public void close() throws IOException {
        lineReader.close();
    }

    @Override
    public Long createKey() {
        return lineKey.get();
    }

    @Override
    public String createValue() {
        System.out.println("createValue called");
        return lineValue.toString();
    }

    @Override
    public long getPos() throws IOException {
        return lineReader.getPos();
    }

    @Override
    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }

    @Override
    public boolean next(Long key, String value) throws IOException {
        System.out.println("next called");

        // get the next line
        if (!lineReader.next(lineKey, lineValue)) {
            return false;
        }

        key = lineKey.get();
        value = lineValue.toString();

        System.out.println(key);
        System.out.println(value);


        return true;
    }
}

In my Spark application I read the file by calling sparkContext.hadoopFile method. But I only get an empty output from the following code:

public class AssociationRulesAnalysis {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
            @Override
            public String call(Tuple2<Long, String> arg0) throws Exception {
                System.out.println("map: " + arg0._2());
                return arg0._2();
            }
        });

        List<String> asList = inputRdd.take(10);
        for(String s : asList) {
            System.out.println(s);
        }
    }
}

I only get 10 empty lines back from the RDD.

The console output with the added prints looks the following:

=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:










Stopping...

(The RDD data is printed below the ===== output (10 empty lines!!!). The output above the ===== seems to be made by the RDD.count call. In the next method the correct keys & values are shown!? What am I doing wrong?

Upvotes: 0

Views: 135

Answers (1)

Amit
Amit

Reputation: 1121

lineKey and lineValue are never initialized to the key and value passed in to the overriden next method in your MyStringRecordReader. Hence it is always showing the EMPTY string when you try to use your RecordReader. If you want a different key and value for a record in the file then you need to use key and value passed in to the next method and initialize them with your computed key and value. If you do not intend to change the key/value record then get rid of the following. Everytime you execute this piece of code you are overwriting key/value read from file with your EMPTY string and 0L.

key = lineKey.get();
value = lineValue.toString();

Upvotes: 0

Related Questions