Reputation: 1547
This is the job, which joins two relations,
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.contrib.utils.join.*;
import java.io.*;
public class TwoByTwo extends Configured implements Tool
{
public static class MapClass extends DataJoinMapperBase
{
protected Text generateInputTag( String inputFile)
{
String datasource = inputFile.split( "\\.")[ 0];
return new Text( datasource);
}//end generateInputTag
protected Text generateGroupKey( TaggedMapOutput aRecord)
{
String line = ( ( Text) aRecord.getData()).toString();
//between two relations there may be more than one common attributes
//so group key has to include all these common attributes. Common
//attributes begin with an '_'( underscore).
String[] tokens = line.split(",");
String groupKey = "";
for( String s : tokens)
{
if( s.charAt( 0) == '_')
{
groupKey = groupKey + s;
}
}
return new Text( groupKey);
}//end generateGroupKey
protected TaggedMapOutput generateTaggedMapOutput( Object value)
{
TaggedWritable retv = new TaggedWritable( ( Text) value);
retv.setTag( this.inputTag);
return retv;
}//end TaggedMapOutput
}//end MapClass
public static class Reduce extends DataJoinReducerBase
{
protected TaggedMapOutput combine( Object[] tags, Object[] values)
{
if( tags.length < 2)
{
return null;
}
String joinedStr = "";
for( int i = 0; i < values.length; i++)
{
if( i > 0)
{
joinedStr += ",";
}
TaggedWritable tw = ( TaggedWritable) values[ i];
String line = ( ( Text) tw.getData()).toString();
String[] tokens = line.split( ",", 2);
joinedStr += tokens[ 1];
}
TaggedWritable retv = new TaggedWritable( new Text( joinedStr));
retv.setTag( ( Text) tags[ 0]);
return retv;
}//end TaggedMapOutput
}//end Reduce
public static class TaggedWritable extends TaggedMapOutput
{
private Writable data;
public TaggedWritable( Writable data)
{
this.tag = new Text( "");
this.data = data;
}//end TaggedWritable
public Writable getData()
{
return data;
}//end getData
public void write( DataOutput out) throws IOException
{
this.tag.write( out);
this.data.write( out);
}//end write
public void readFields( DataInput in) throws IOException
{
this.tag.readFields( in);
this.data.readFields( in);
}//end readFields
}//end TaggedWritable
public int run( String[] args) throws Exception
{
Configuration conf = getConf();
JobConf job = new JobConf( conf, TwoByTwo.class);
Path in = new Path( "relations/");
Path out = new Path( "relout/");
FileInputFormat.setInputPaths( job, in);
FileOutputFormat.setOutputPath( job, out);
job.setJobName( "TwoByTwo");
job.setMapperClass( MapClass.class);
job.setReducerClass( Reduce.class);
job.setInputFormat( TextInputFormat.class);
job.setOutputFormat( TextOutputFormat.class);
job.setOutputKeyClass( Text.class);
job.setOutputValueClass( TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob( job);
return 0;
}//end run
public static void main( String[] args) throws Exception
{
int res = ToolRunner.run( new Configuration(), new TwoByTwo(), args);
System.exit( res);
}//end main
}
When I run this job,
bin/hadoop jar /home/hduser/TwoByTwo.jar TwoByTwo -libjars /usr/local/hadoop/contrib/datajoin/hadoop-datajoin-1.0.3.jar
MapClass runs appropriately. When Reduce runs after sometime of running I get this NoSuchMethodException
12/10/18 16:38:17 INFO mapred.JobClient: map 100% reduce 27%
12/10/18 16:38:19 INFO mapred.JobClient: Task Id : attempt_201210181416_0013_r_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException: TwoByTwo$TaggedWritable.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:62)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271)
at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:249)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:245)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:129)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.NoSuchMethodException: TwoByTwo$TaggedWritable.<init>()
at java.lang.Class.getConstructor0(Class.java:2721)
at java.lang.Class.getDeclaredConstructor(Class.java:2002)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109)
... 15 more
I have problem with nested class TaggedWritable. Why do I have problem with this class on reduce side and not on map side? How can I resolve this error? The constraint of two relations plays any role to the error? Thanks for any help.
Upvotes: 3
Views: 2589
Reputation: 1547
This code solves the problem and gives the expected result. It is from here,
public static class TaggedWritable extends TaggedMapOutput
{
private Writable data;
public TaggedWritable()
{
this.tag = new Text();
}//end empty( taking no parameters) constructor TaggedWritable
public TaggedWritable( Writable data)
{
this.tag = new Text( "");
this.data = data;
}//end constructor TaggedWritable
public Writable getData()
{
return data;
}//end getData
public void setData( Writable data)
{
this.data = data;
}//end setData
public void write( DataOutput out) throws IOException
{
this.tag.write( out);
out.writeUTF( this.data.getClass().getName());
this.data.write( out);
}//end write
public void readFields( DataInput in) throws IOException
{
this.tag.readFields( in);
String dataClz = in.readUTF();
try
{
//try - catch is needed because the " error: unreported exception
//ClassNotFoundException; must be caught or declared to be thrown"
//is "raised" from compiler
if( this.data == null || !this.data.getClass().getName().equals( dataClz))
{
//this line of code "raises" the compile error mentioned above
this.data = (Writable) ReflectionUtils.newInstance( Class.forName( dataClz), null);
}
this.data.readFields( in);
}
catch( ClassNotFoundException cnfe)
{
System.out.println( "Problem in TaggedWritable class, method readFields.");
}
}//end readFields
}//end TaggedWritable
As rystsov says above, an empty( without parameters) constructor is needed. This solves the problem,
NoSuchMethodException.
Then another error occurs,
NullPointerException,
in readFields, at line
this.data.readFields( in);
This is solved, writing readFields the way above, where null values are handled properly.
Upvotes: 2
Reputation: 1928
Class TaggedWritable
doesn't have empty constructor, so on the reduce phase when your serialized data should be read the app falls because it is impossible to create TaggedWritable
typed key via reflection. You should add an empty constructor.
Your map phase finishes successfully because on the map stage your mapper creates TaggedWritable
typed keys itself.
Upvotes: 4