Reputation: 8144
Im using hadoop mapreduce for the following task:
My mapper will read the .xml
file from HDFS and pass it to some service. I 'm having some code like this in my setup()
:
try
{
System.out.println(propertyName);
session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
session.open();
}
catch
{
System.out.println("error");
}
When the datanode is not having the FindPath
[ FindPath - service is not yet started ] it will throw an exception.
Now my problem is my mapper program is not moved to the other cluster which has the runing FindPath
service.
Example:
Datanode 1 Datanode 2 - two data nodes
If DataNode 1 is not yet started the FindPath
service, then the Input should move to Datanode 2.
How can I achieve that?
And how can I change the TaskStatus
in Datanode 1 to 'Failed' when it throws an exception?
UPDATE
Job j;
catch(Exception Ex)
{
j.failTask((TaskAttemptID)context.getTaskAttemptID());
System.out.println("error");
}
I have used something like this but it throws an NullPointer exception.
How to use failTask
in my mapper or mapper setup new API?
Upvotes: 3
Views: 902
Reputation: 692
Use the JobClient
to access the RunningJob
class ( I have 1.0.4 API).
So the code looks like this:
Have a JobClient
and a RunningJob
reference in your setup()
.
The method is as follows:
public void setup(Context context)
{
JobClient jobClient;
RunningJob runningJob;
try
{
jobClient = new JobClient((JobConf)context.getConfiguration());
runningJob = jobClient.getJob((JobID)(context.getJobId()); //mapred.JobID!
}
catch (IOException e)
{
System.out.println("IO Exception");
}
try
{
System.out.println(propertyName);
session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
session.open();
}
catch
{
System.out.println("error");
runningJob.killTask((TaskAttemptID)context.getTaskAttemptID(), true);// cast as mapred.TaskAttemptID
}
}
This causes the TaskAttempt to fail.
Finally, you should probably set mapred.map.max.attempts
to 1 so that a failed taskAttempt is a failed task.
You should consider altering mapred.max.map.failures.percent
as it reflects the tolerance of your cluster towards failed tasks.
Upvotes: 2