Punit Naik
Punit Naik

Reputation: 515

How to update MapReduce job parameters inside Mapper

I wanted to update a parameter that I set (in the Driver class) while working inside the Mapper class.

I tried doing,

context.getConfiguration().set("arg", "updatedvalue")

inside the mapper. It did update it, but the reducer's outputs were all zeros.

Please help.

Mapper:-

public class RecMap extends Mapper<LongWritable, Text, Text, Text> {
    public static TreeMap<String,Integer> co_oc_mat=new TreeMap<String,Integer>();
    public static HashMap<String,Float> user_scoring_mat=new HashMap<String,Float>();
    public static TreeMap<String,Float> sorted_user_scoring_mat=new TreeMap<String,Float>();
    public static ArrayList<String> vals=new ArrayList<String>();
    public static ArrayList<Integer> unique_items=new ArrayList<Integer>();
    public static ArrayList<Integer> unique_users=new ArrayList<Integer>();
    public static int a=0;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        ++a;
        String b=value.toString();
        vals.add(b);
        String[] parts=b.split("\\,");
        user_scoring_mat.put(parts[0]+","+parts[1], Float.parseFloat(parts[2]));
    }
    @Override
    public void cleanup(Context context) throws IOException, InterruptedException{
        co_oc_mat.putAll(new get_co_oc_mat().get(vals, a));
        unique_users.addAll(new get_unique_users().get(vals, a));
        unique_items.addAll(new get_unique_items().get(vals, a));
        for(int i=0;i<unique_users.size();i++){
            for(int j=0;j<unique_items.size();j++){
                if(!user_scoring_mat.containsKey(unique_users.get(i)+","+unique_items.get(j))){
                    user_scoring_mat.put(unique_users.get(i)+","+unique_items.get(j), 0.0f);
                }
            }
        }
        sorted_user_scoring_mat.putAll(user_scoring_mat);
        String prev="null";int row_num=-1;String value="A";
        String prev2="null";int col_num=-1;String value2="B";

        //Transmitting co_oc_mat
        for(Entry<String, Integer> entry: co_oc_mat.entrySet()){
            String check_val=entry.getKey().split("\\,")[0];
            if(!prev.contentEquals(check_val)){
                if(row_num==-1){
                    prev=check_val;
                    ++row_num;
                }
                else{
                    for(int i=0;i<unique_users.size();i++){
                        String key=row_num+","+i;
                        context.write(new Text(key), new Text(value));
                    }
                    value="A";
                    prev=check_val;
                    ++row_num;
                }
            }
            value=value+","+entry.getValue();
        }
        for(int i=0;i<unique_users.size();i++){
            String key=row_num+","+i;
            context.write(new Text(key), new Text(value));
        }

        //Transmitting sorted_user_scoring_mat
        for(Entry<String, Float> entry: sorted_user_scoring_mat.entrySet()){
            //context.write(new Text(entry.getKey()), new Text(String.valueOf(entry.getValue())));
            String check_val=entry.getKey().split("\\,")[0];
            if(!prev2.contentEquals(check_val)){
                if(col_num==-1){
                    prev2=check_val;
                    ++col_num;
                }
                else{
                    for(int i=0;i<unique_items.size();i++){
                        String key=i+","+col_num;
                        context.write(new Text(key), new Text(value2));
                    }
                    value2="B";
                    prev2=check_val;
                    ++col_num;
                }
            }
            value2=value2+","+entry.getValue();
        }
        for(int i=0;i<unique_items.size();i++){
            String key=i+","+col_num;
            context.write(new Text(key), new Text(value2));
        }
        context.getConfiguration().setInt("n", unique_items.size());
    }
}

Reducer:-

import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class RecReduce extends
Reducer<Text, Text, Text, Text> {
    public static int n=0;
    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        n=context.getConfiguration().getInt("n", 1);
    }
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String[] value;
        HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
        HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
        for (Text val : values) {
            value = val.toString().split(",");
            if (value[0].equals("A")) {
                for(int z=1;z<=n;z++){
                    hashA.put(z, Float.parseFloat(value[z]));}
            } else{
                for(int a=1;a<=n;a++){
                    hashB.put(a, Float.parseFloat(value[a]));}
            }
        }
        float result = 0.0f;
        float a_ij;
        float b_jk;
        for (int j=1;j<=n;j++) {
            a_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
            b_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
            result +=a_ij*b_jk;
        }
        context.write(null, new Text(key.toString() + "," + Float.toString(result)));
    }
}

Driver:-

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RecDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.setInt("n", 0);
        Job job = new Job(conf, "Recommendations_CollaborativeFiltering");
        job.setJarByClass(RecDriver.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(RecMap.class);
        //job.setNumReduceTasks(0);
        //Don't use combiner if there is no scope of combining the output. Otherwise the job will get stuck.
        //job.setCombinerClass(RecReduce.class);
        job.setReducerClass(RecReduce.class);

        FileInputFormat.addInputPath(job, new Path("/home/gts1/Desktop/recommendation.txt"));

        FileOutputFormat.setOutputPath(job, new Path("/home/gts1/Desktop/rec1_out"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

And this was the output that I got:-

0,0,0.0
0,1,0.0
0,2,0.0
0,3,0.0
0,4,0.0
1,0,0.0
1,1,0.0
1,2,0.0
1,3,0.0
1,4,0.0
2,0,0.0
2,1,0.0
2,2,0.0
2,3,0.0
2,4,0.0
3,0,0.0
3,1,0.0
3,2,0.0
3,3,0.0
3,4,0.0

Upvotes: 0

Views: 552

Answers (1)

Praveen Sripati
Praveen Sripati

Reputation: 33545

As mentioned in the Hadoop API documentation JobContext provides a A read-only view of the job that is provided to the tasks while they are running. So, it should be possible to get the parameter value in the context of the mapper/reducer methods, but not set them.

When such a coordincation has to be used across different process machines then Apache ZooKeeper has to be used to set the value in the mapper and get the same value in the reducer.

Upvotes: 1

Related Questions