Reputation: 3426
I want to use a own FileInputFormat
with a custom RecordReader
to read csv data into <Long><String>
pairs.
Therefore I created the class MyTextInputFormat
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
and the class MyStringRecordReader
:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
In my Spark application I read the file by calling sparkContext.hadoopFile
method. But I only get an empty output from the following code:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
I only get 10 empty lines back from the RDD.
The console output with the added prints
looks the following:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(The RDD data is printed below the =====
output (10 empty lines!!!). The output above the =====
seems to be made by the RDD.count
call. In the next
method the correct keys & values are shown!? What am I doing wrong?
Upvotes: 0
Views: 135
Reputation: 1121
lineKey
and lineValue
are never initialized to the key
and value
passed in to the overriden next
method in your MyStringRecordReader
. Hence it is always showing the EMPTY string when you try to use your RecordReader
.
If you want a different key and value for a record in the file then you need to use key and value passed in to the next
method and initialize them with your computed key and value. If you do not intend to change the key/value record then get rid of the following. Everytime you execute this piece of code you are overwriting key/value read from file with your EMPTY string and 0L.
key = lineKey.get();
value = lineValue.toString();
Upvotes: 0