Reputation: 2605
Please go a little easy on me cause I am only 3 months old in Hadoop and Mapreduce.
I've 2 files 120 MB each, The data inside each file is completely unstructured but with a common pattern. Because of the varying structure of data my requirement can not be sufficed by the default LineInputFormat.
Hence While reading the file I override the isSplitable() method and stop the split by returning false. so that 1 mapper can access one complete file and I can perform my logic and achieve the requirement.
My machine can run two mappers in parallel, So by stopping the split i am degrading the performance by running the mapper one by one for each file rather then running two mappers parallely for a file.
My Question is How can I run two mappers in parallel for both the files so the performance improves.
For Example
When split was allowed:
file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min
file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min
Total Time for reading two files ===== 4 min
When Split not allowed:
file 1: no parallel jobs so (1st mapper)---------4 min
file 2: no parallel jobs so (1st mapper)---------4 min
Total Time to read two files ===== 8 min (Performance degraded)
What I want
File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min
Total time to read two files ====== 4 min
Basically I want both the Files to be read at the same time by two different mapper.
Please help me in achieving the scenario.
Below are my Custom InputFormat and Custom RecordReader Code.
public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{
@Override
public boolean isSplitable(FileSystem fs, Path filename)
{
//System.out.println("Inside the isSplitable Method of NSI_inputformatter");
return false;
}
@Override
public RecordReader<NullWritable, Text> getRecordReader(InputSplit split,
JobConf job_run, Reporter reporter) throws IOException {
// TODO Auto-generated method stub
//System.out.println("Inside the getRecordReader method of NSI_inputformatter");
return new NSI_record_reader(job_run, (FileSplit)split);
}
}
Record Reader:
public class NSI_record_reader implements RecordReader<NullWritable, Text>
{
FileSplit split;
JobConf job_run;
String text;
public boolean processed=false;
public NSI_record_reader(JobConf job_run, FileSplit split)
{
//System.out.println("Inside the NSI_record_reader constructor");
this.split=split;
this.job_run=job_run;
//System.out.println(split.toString());
}
@Override
public boolean next(NullWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
//System.out.println("Inside the next method of the NLI_record_reader");
if (!processed)
{
byte [] content_add=new byte[(int)(split.getLength())];
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job_run);
FSDataInputStream input=null;
try{
input=fs.open(file);
System.out.println("the input is " +input+ input.toString());
IOUtils.readFully(input, content_add, 0, content_add.length);
value.set(content_add, 0, content_add.length);
}
finally
{
IOUtils.closeStream(input);
}
processed=true;
return true;
}
return false;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public NullWritable createKey() {
System.out.println("Inside createkey() mrthod of NSI_record_reader");
// TODO Auto-generated method stub
return NullWritable.get();
}
@Override
public Text createValue() {
System.out.println("Inside createValue() mrthod of NSI_record_reader");
// TODO Auto-generated method stub
return new Text();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
System.out.println("Inside getPs() mrthod of NSI_record_reader");
return processed ? split.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
System.out.println("Inside getProgress() mrthod of NSI_record_reader");
return processed ? 1.0f : 0.0f;
}
}
Input Sample:
<Dec 12, 2013 1:05:56 AM CST> <Error> <HTTP> <BEA-101017> <[weblogic.servlet.internal.WebAppServletContext@42e87d99 - appName: 'Agile', name: '/Agile', context-path: '/Agile', spec-version: 'null'] Root cause of ServletException.
javax.servlet.jsp.JspException: Connection reset by peer: socket write error
at com.agile.ui.web.taglib.common.FormTag.writeFormHeader(FormTag.java:498)
at com.agile.ui.web.taglib.common.FormTag.doStartTag(FormTag.java:429)
at jsp_servlet._default.__login_45_cms._jspService(__login_45_cms.java:929)
at weblogic.servlet.jsp.JspBase.service(JspBase.java:34)
at weblogic.servlet.internal.StubSecurityHelper$ServletServiceAction.run(StubSecurityHelper.ja va:227)
Truncated. see log file for complete stacktrace
>
Retrieving the value for the attribute Page Two.Validation Status for the Object 769630
Retrieving the value for the attribute Page Two.Pilot Required for the Object 769630
Retrieving the value for the attribute Page Two.NPO Contact for the Object 769630
<Dec 12, 2013 1:12:13 AM CST> <Warning> <Socket> <BEA-000449> <Closing socket as no data read from it during the configured idle timeout of 0 secs>
Thanks.
Upvotes: 0
Views: 390
Reputation: 2538
You could try to set property -D mapred.min.split.size=209715200
. In this case FileInputFormat
should not split your files because they are smaller than mapred.min.split.size
.
Upvotes: 1