Reputation: 2726
All my mappers fail with the exception below. I've only shown the last failure for conciseness.
Why would this be happening and how do I fix it?
16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readUTF(DataInputStream.java:609)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient: Job Counters
16/09/21 17:02:00 INFO mapred.JobClient: Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient: Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient: Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient: Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
I'm using an Accumulo table as my input data. My setup is as follows:
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
conf.set(TYPE_ID_MAP_KEY, idMapFileContent);
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
job.setMapperClass(DanglingLinksFinderMapper.class);
job.setReducerClass(DanglingLinksFinderReducer.class);
this.setupRowInputFormat(job);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path out = new Path(args[0]);
LOGGER.info("Writing to output directory: " + out.toUri());
FileOutputFormat.setOutputPath(job, out);
int exitCode = job.waitForCompletion(true) ? 0 : 1;
}
private Job setupRowInputFormat(Job job)
throws IOException, AccumuloSecurityException
{
job.setInputFormatClass(AccumuloRowInputFormat.class);
Configuration conf = job.getConfiguration();
AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
LOGGER.info(connectInfo.toString());
AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
return job;
}
I'm using Hadoop 2.6.0, Accumulo 1.5.0, and Java 1.7.
I had this working the other day and didn't (to my knowledge) change anything. So I'm thinking maybe it has something to do with configuration or data state on the server I'm running it on? The job works fine on a test table running in a Docker container on my local machine, but it fails on my remote test server.
I can log into the accumulo shell
and scan the table I'm working with. Everything looks fine there. I also tried running a compaction on the test server, which worked fine but did not fix the problem.
Upvotes: 3
Views: 393
Reputation: 1236
I'm guessing that you have a version mismatch of the Accumulo jars you use to launch the MapReduce job and those that you include for the job itself to use (Mappers/Reducers) via DistributedCache or the libjars CLI option.
Because you specified no ranges, AccumuloInputFormat will automatically fetch all of the Tablet boundaries for your table, and create the same number of RangeInputSplit objects as you have Tablets in the table. This split creation is done in the local JVM (the JVM created when you submit your job). These RangeInputSplit objects are serialized and passed into YARN.
The error you provided is when a Mapper takes one of these serialized RangeInputSplit objects and tries to deserialize it. Some how, this is failing because there is not enough serialized data to deserialize what the version of Accumulo running in the Mapper expects to read.
It's possible that this is just a serialization error in your version of Accumulo (please do share that), but I don't recall hearing about such an error. I would guess that there's a difference in the version of Accumulo on the local classpath and the Mapper's classpath.
Upvotes: 2