Reputation: 3324
I want to parse PDF files in my hadoop 2.2.0 program and I found this, followed what it says and until now, I have these three classes:
PDFWordCount
: the main class containing map and reduce functions. (just like native hadoop wordcount sample but instead of TextInputFormat
I used my PDFInputFormat
class.PDFRecordReader extends RecordReader<LongWritable, Text>
: Which is the main work here. Especially I put my initialize
function here for more illustration.
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
System.out.println("initialize");
System.out.println(genericSplit.toString());
FileSplit split = (FileSplit) genericSplit;
System.out.println("filesplit convertion has been done");
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
FileSystem fs = file.getFileSystem(conf);
System.out.println("fs has been opened");
start = split.getStart();
end = start + split.getLength();
System.out.println("going to open split");
FSDataInputStream filein = fs.open(split.getPath());
System.out.println("going to load pdf");
PDDocument pd = PDDocument.load(filein);
System.out.println("pdf has been loaded");
PDFTextStripper stripper = new PDFTextStripper();
in =
new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes(
"UTF-8")));
start = 0;
this.pos = start;
System.out.println("init has finished");
}
(You can see my system.out.println
s for debugging.
This method fails in converting genericSplit
to FileSplit
. Last thing I see in console, is this:
hdfs://localhost:9000/in:0+9396432
which is genericSplit.toString()
PDFInputFormat extends FileInputFormat<LongWritable, Text>
: which just creates new PDFRecordReader
in createRecordReader
method.
I want to know what is my mistake?
Do I need extra classes or something?
Upvotes: 1
Views: 2813
Reputation: 1
package com.sidd.hadoop.practice.pdf;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.sidd.hadoop.practice.input.pdf.PdfFileInputFormat;
import com.sidd.hadoop.practice.output.pdf.PdfFileOutputFormat;
public class ReadPdfFile {
public static class MyMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// context.progress();
context.write(key, value);
}
}
public static class MyReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
if (values.iterator().hasNext()) {
context.write(key, values.iterator().next());
} else {
context.write(key, new Text(""));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Read Pdf");
job.setJarByClass(ReadPdfFile.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(PdfFileInputFormat.class);
job.setOutputFormatClass(PdfFileOutputFormat.class);
removeDir(args[1], conf);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void removeDir(String path, Configuration conf) throws IOException {
Path output_path = new Path(path);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output_path)) {
fs.delete(output_path, true);
}
}
}
Upvotes: 0
Reputation: 1459
Reading PDFs is not that difficult, you need to extend the class FileInputFormat as well as the RecordReader. The FileInputClass should not be able to split PDF files since they are binaries.
public class PDFInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new PDFLineRecordReader();
}
// Do not allow to ever split PDF files, even if larger than HDFS block size
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
The RecordReader then performs the reading itself (I am using PDFBox to read PDFs).
public class PDFLineRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
private int currentLine = 0;
private List<String> lines = null;
private PDDocument doc = null;
private PDFTextStripper textStripper = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
final Path file = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream filein = fs.open(fileSplit.getPath());
if (filein != null) {
doc = PDDocument.load(filein);
// Konnte das PDF gelesen werden?
if (doc != null) {
textStripper = new PDFTextStripper();
String text = textStripper.getText(doc);
lines = Arrays.asList(text.split(System.lineSeparator()));
currentLine = 0;
}
}
}
// False ends the reading process
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
if (currentLine < lines.size()) {
String line = lines.get(currentLine);
key.set(line);
value.set("");
currentLine++;
return true;
} else {
// All lines are read? -> end
key = null;
value = null;
return false;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (100.0f / lines.size() * currentLine) / 100.0f;
}
@Override
public void close() throws IOException {
// If done close the doc
if (doc != null) {
doc.close();
}
}
Hope this helps!
Upvotes: 1