Alps
Alps

Reputation: 93

Type mismatch in key from map, using SequenceFileInputFormat correctly

I am trying to run a recommender example from chapter6 (listing 6.1 ~ 6.4) in the ebook Mahout in Action. There are two mapper/reducer pairs. Here is the code:

Mapper - 1

public class WikipediaToItemPrefsMapper extends 
        Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\d+)");

@Override
public void map(LongWritable key,
          Text value,
          Context context)
throws IOException, InterruptedException {

    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}

}

Reducer - 1

public class WikipediaToUserVectorReducer extends 
        Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
public void reduce(VarLongWritable userID, 
                    Iterable<VarLongWritable> itemPrefs,
                    Context context)
  throws IOException, InterruptedException {

        Vector userVector = new RandomAccessSparseVector(
        Integer.MAX_VALUE, 100);
        for (VarLongWritable itemPref : itemPrefs) {
            userVector.set((int)itemPref.get(), 1.0f);
        }

        //LongWritable userID_lw = new LongWritable(userID.get());
        context.write(userID, new VectorWritable(userVector));
        //context.write(userID_lw, new VectorWritable(userVector));
}

}

The reducer outputs a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} provided FileInputformat and TextInputFormat are used in the driver.

I want to use another pair of mapper-reducer to process this data further:

Mapper - 2

public class UserVectorToCooccurenceMapper extends
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> {

@Override
public void map(VarLongWritable userID,
                VectorWritable userVector,
                Context context)
throws IOException, InterruptedException {

    Iterator<Vector.Element> it = userVector.get().iterateNonZero();
    while (it.hasNext()) {
        int index1 = it.next().index();
        Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
        while (it2.hasNext()) {
            int index2 = it2.next().index();
                context.write(new IntWritable(index1),
                                new IntWritable(index2));
        }
    }
}

}

Reducer - 2

public class UserVectorToCooccurenceReducer extends Reducer {

@Override
public void reduce(IntWritable itemIndex1,
          Iterable<IntWritable> itemIndex2s,
          Context context)
throws IOException, InterruptedException {

    Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (IntWritable intWritable : itemIndex2s) {
        int itemIndex2 = intWritable.get();
        cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
    }
    context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
}

}

This is the driver I am using:

public final class RecommenderJob extends Configured implements Tool {

@Override public int run(String[] args) throws Exception {

  Job job_preferenceValues = new Job (getConf());
  job_preferenceValues.setJarByClass(RecommenderJob.class);
  job_preferenceValues.setJobName("job_preferenceValues");

  job_preferenceValues.setInputFormatClass(TextInputFormat.class);
  job_preferenceValues.setOutputFormatClass(SequenceFileOutputFormat.class);

  FileInputFormat.setInputPaths(job_preferenceValues, new Path(args[0]));
  SequenceFileOutputFormat.setOutputPath(job_preferenceValues, new Path(args[1]));

  job_preferenceValues.setMapOutputKeyClass(VarLongWritable.class);
  job_preferenceValues.setMapOutputValueClass(VarLongWritable.class);

  job_preferenceValues.setOutputKeyClass(VarLongWritable.class);
  job_preferenceValues.setOutputValueClass(VectorWritable.class);

  job_preferenceValues.setMapperClass(WikipediaToItemPrefsMapper.class);
  job_preferenceValues.setReducerClass(WikipediaToUserVectorReducer.class);

  job_preferenceValues.waitForCompletion(true);

  Job job_cooccurence = new Job (getConf());
  job_cooccurence.setJarByClass(RecommenderJob.class);
  job_cooccurence.setJobName("job_cooccurence");

  job_cooccurence.setInputFormatClass(SequenceFileInputFormat.class);
  job_cooccurence.setOutputFormatClass(TextOutputFormat.class);

  SequenceFileInputFormat.setInputPaths(job_cooccurence, new Path(args[1]));
  FileOutputFormat.setOutputPath(job_cooccurence, new Path(args[2]));

  job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
  job_cooccurence.setMapOutputValueClass(VectorWritable.class);

  job_cooccurence.setOutputKeyClass(IntWritable.class);
  job_cooccurence.setOutputValueClass(VectorWritable.class);

  job_cooccurence.setMapperClass(UserVectorToCooccurenceMapper.class);
  job_cooccurence.setReducerClass(UserVectorToCooccurenceReducer.class);

  job_cooccurence.waitForCompletion(true);

  return 0;

}

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecommenderJob(), args);

} }

The error that I get is:

java.io.IOException: Type mismatch in key from map: expected org.apache.mahout.math.VarLongWritable, received org.apache.hadoop.io.IntWritable

In course of Googling for a fix, I found out that my issue is similar to this question. But the difference is that I am already using SequenceFileInputFormat and SequenceFileOutputFormat, I believe correctly. I also see that org.apache.mahout.cf.taste.hadoop.item.RecommenderJob does more or less something similar. In my understanding & Yahoo Tutorial

SequenceFileOutputFormat rapidly serializes arbitrary data types to the file; the corresponding SequenceFileInputFormat will deserialize the file into the same types and presents the data to the next Mapper in the same manner as it was emitted by the previous Reducer.

What am I doing wrong? Will really appreciate some pointers from someone.. I spent the day trying to fix this and got nowhere :(

Upvotes: 2

Views: 2469

Answers (1)

Chris White
Chris White

Reputation: 30089

Your second mapper has the following signature:

public class UserVectorToCooccurenceMapper extends 
        Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>

But you define the following in your driver code:

job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
job_cooccurence.setMapOutputValueClass(VectorWritable.class);

The reducer is expecting <IntWritable, IntWritable> as input, so you should just amend your driver code to:

job_cooccurence.setMapOutputKeyClass(IntWritable.class);
job_cooccurence.setMapOutputValueClass(IntWritable.class);

Upvotes: 2

Related Questions