Reputation: 93
I am trying to run a recommender example from chapter6 (listing 6.1 ~ 6.4) in the ebook Mahout in Action. There are two mapper/reducer pairs. Here is the code:
Mapper - 1
public class WikipediaToItemPrefsMapper extends
Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {
private static final Pattern NUMBERS = Pattern.compile("(\d+)");
@Override
public void map(LongWritable key,
Text value,
Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while (m.find()) {
itemID.set(Long.parseLong(m.group()));
context.write(userID, itemID);
}
}
}
Reducer - 1
public class WikipediaToUserVectorReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
public void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs,
Context context)
throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(
Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
userVector.set((int)itemPref.get(), 1.0f);
}
//LongWritable userID_lw = new LongWritable(userID.get());
context.write(userID, new VectorWritable(userVector));
//context.write(userID_lw, new VectorWritable(userVector));
}
}
The reducer outputs a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} provided FileInputformat and TextInputFormat are used in the driver.
I want to use another pair of mapper-reducer to process this data further:
Mapper - 2
public class UserVectorToCooccurenceMapper extends
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> {
@Override
public void map(VarLongWritable userID,
VectorWritable userVector,
Context context)
throws IOException, InterruptedException {
Iterator<Vector.Element> it = userVector.get().iterateNonZero();
while (it.hasNext()) {
int index1 = it.next().index();
Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
while (it2.hasNext()) {
int index2 = it2.next().index();
context.write(new IntWritable(index1),
new IntWritable(index2));
}
}
}
}
Reducer - 2
public class UserVectorToCooccurenceReducer extends Reducer {
@Override
public void reduce(IntWritable itemIndex1,
Iterable<IntWritable> itemIndex2s,
Context context)
throws IOException, InterruptedException {
Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (IntWritable intWritable : itemIndex2s) {
int itemIndex2 = intWritable.get();
cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
}
context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
}
}
This is the driver I am using:
public final class RecommenderJob extends Configured implements Tool {
@Override public int run(String[] args) throws Exception {
Job job_preferenceValues = new Job (getConf());
job_preferenceValues.setJarByClass(RecommenderJob.class);
job_preferenceValues.setJobName("job_preferenceValues");
job_preferenceValues.setInputFormatClass(TextInputFormat.class);
job_preferenceValues.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job_preferenceValues, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job_preferenceValues, new Path(args[1]));
job_preferenceValues.setMapOutputKeyClass(VarLongWritable.class);
job_preferenceValues.setMapOutputValueClass(VarLongWritable.class);
job_preferenceValues.setOutputKeyClass(VarLongWritable.class);
job_preferenceValues.setOutputValueClass(VectorWritable.class);
job_preferenceValues.setMapperClass(WikipediaToItemPrefsMapper.class);
job_preferenceValues.setReducerClass(WikipediaToUserVectorReducer.class);
job_preferenceValues.waitForCompletion(true);
Job job_cooccurence = new Job (getConf());
job_cooccurence.setJarByClass(RecommenderJob.class);
job_cooccurence.setJobName("job_cooccurence");
job_cooccurence.setInputFormatClass(SequenceFileInputFormat.class);
job_cooccurence.setOutputFormatClass(TextOutputFormat.class);
SequenceFileInputFormat.setInputPaths(job_cooccurence, new Path(args[1]));
FileOutputFormat.setOutputPath(job_cooccurence, new Path(args[2]));
job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
job_cooccurence.setMapOutputValueClass(VectorWritable.class);
job_cooccurence.setOutputKeyClass(IntWritable.class);
job_cooccurence.setOutputValueClass(VectorWritable.class);
job_cooccurence.setMapperClass(UserVectorToCooccurenceMapper.class);
job_cooccurence.setReducerClass(UserVectorToCooccurenceReducer.class);
job_cooccurence.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecommenderJob(), args);
} }
The error that I get is:
java.io.IOException: Type mismatch in key from map: expected org.apache.mahout.math.VarLongWritable, received org.apache.hadoop.io.IntWritable
In course of Googling for a fix, I found out that my issue is similar to this question. But the difference is that I am already using SequenceFileInputFormat and SequenceFileOutputFormat, I believe correctly. I also see that org.apache.mahout.cf.taste.hadoop.item.RecommenderJob does more or less something similar. In my understanding & Yahoo Tutorial
SequenceFileOutputFormat rapidly serializes arbitrary data types to the file; the corresponding SequenceFileInputFormat will deserialize the file into the same types and presents the data to the next Mapper in the same manner as it was emitted by the previous Reducer.
What am I doing wrong? Will really appreciate some pointers from someone.. I spent the day trying to fix this and got nowhere :(
Upvotes: 2
Views: 2469
Reputation: 30089
Your second mapper has the following signature:
public class UserVectorToCooccurenceMapper extends
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>
But you define the following in your driver code:
job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
job_cooccurence.setMapOutputValueClass(VectorWritable.class);
The reducer is expecting <IntWritable, IntWritable>
as input, so you should just amend your driver code to:
job_cooccurence.setMapOutputKeyClass(IntWritable.class);
job_cooccurence.setMapOutputValueClass(IntWritable.class);
Upvotes: 2