JustTheAverageGirl
JustTheAverageGirl

Reputation: 3323

Hadoop: Mapreduce - Sum of data (Java)

After using my mapreduce job this is the output:

User16565   Logins: 1   Orders:1
User16566   Logins: 2   Orders:2
User16567   Logins: 1   Orders:1

Everything looks great, but when the log-file has thousands of entries it is not very helpful. Is there a way to change my code to sum up the "Logins" and "Orders" so I can calculate the difference?

Edit: New Question/Problem

Log Example:

2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-

I found an error in my code. I realized that the Logins & Orders aren't count correctly. At first it seemed that the output is correct but when i checked the logins & orders manually i realized that there is an error. Output:

User73511   Logins: 3   Orders:2
User14253   Logins: 1   Orders:1

Should be:

User73511   Logins: 1   Orders:2
User14253   Logins: 1   Orders:0

Here is the whole code:

public class UserOrderCount {

    public static class SingleUserMapper extends
            Mapper<LongWritable, Text, Text, CountInformationTuple> {

        private Text outUserId = new Text();
        private CountInformationTuple outCountOrder = new CountInformationTuple();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String tempString = value.toString();
            String[] singleUserData = tempString.split(",");
            String userId = singleUserData[3];
            String featureId = singleUserData[1];

        if (featureId.contains("feature:order-created")) {
                outCountOrder.setCountOrder(1);
        }
        if (featureId.contains("feature:login")) {
                outCountOrder.setCountLogin(1);
        }


            outUserId.set(userId);
            context.write(outUserId, outCountOrder);
        }
    }

    public static class SingleUserReducer extends
            Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {

        private CountInformationTuple result = new CountInformationTuple();

        public void reduce(Text key, Iterable<CountInformationTuple> values,
                Context context) throws IOException, InterruptedException {

            int login = 0;
            int order = 0;

            for (CountInformationTuple val : values) {
                login += val.getCountLogin();
                order += val.getCountOrder();
            }

            result.setCountLogin(login);
            result.setCountOrder(order);

            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: UserOrderCount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf);
        job.setJobName("UserOrderCount");
        job.setJarByClass(UserOrderCount.class);

        job.setMapperClass(SingleUserMapper.class);
        job.setCombinerClass(SingleUserReducer.class);
        job.setReducerClass(SingleUserReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountInformationTuple.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class CountInformationTuple implements Writable {
        private int countOrder = 0;
        private int countLogin = 0;

        public int getCountOrder() {
            return countOrder;
        }

        public void setCountOrder(int order) {
            this.countOrder = order;
        }

        public int getCountLogin() {
            return countLogin;
        }

        public void setCountLogin(int login) {
            this.countLogin = login;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            countOrder = in.readInt();
            countLogin = in.readInt();

        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(countLogin);
            out.writeInt(countOrder);

        }

        @Override
        public String toString() {
            return "Logins: "+ countLogin + "\t" + "Orders:" + countOrder;
        }
    }
}

Upvotes: 1

Views: 8877

Answers (2)

JustTheAverageGirl
JustTheAverageGirl

Reputation: 3323

For the one interessted: Solved my "wrong-output"-error.

public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String tempString = value.toString();
        String[] stringData = tempString.split(",");

        String userID = stringData[3];
        String featureID = stringData[1];

        int login = 0;
        int order = 0;

        if (featureID.matches("feature:login")) {
            login++;
        } else if (featureID.matches("feature:order-created")) {
            order++;
        }

        outUserID.set(userID);
        outUserCount.set(login, order);

        context.write(outUserID, outUserCount);

    }

public static class UserCountTuple implements Writable {

        private IntWritable countLogin;
        private IntWritable countOrder;

        public UserCountTuple() {
            set(new IntWritable(0), new IntWritable(0));
        }

        public void set(int countLogin, int countOrder) {
            this.countLogin.set(countLogin);
            this.countOrder.set(countOrder);
        }

        public void set(IntWritable countLogin, IntWritable countOrder) {
            this.countLogin = countLogin;
            this.countOrder = countOrder;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            countLogin.readFields(in);
            countOrder.readFields(in);

        }

        @Override
        public void write(DataOutput out) throws IOException {
            countLogin.write(out);
            countOrder.write(out);

        }

        public IntWritable getLogin() {
            return countLogin;
        }

        public IntWritable getOrder() {
            return countOrder;
        }

        @Override
        public String toString() {
            return "Logins: " + countLogin + "\t" + "Orders:" + countOrder;
        }

    }

Upvotes: 3

harpun
harpun

Reputation: 4110

As you want to have a single file as the result you could configure your MapReduce job using jobConf.setNumReduceTasks(1) to use a single reduce task only, see JobConf JavaDoc for more information.

Now your one and only reduce task gets the all login and order counts for every user. You can just sum all the login and order values of the processed records in your reduce task and output the summed value in the cleanup() method, which is called only once after all input records to the single reduce task are processed. Example code:

public static class SingleUserReducer extends
        Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {

    private CountInformationTuple result = new CountInformationTuple();
    private int login = 0;
    private int order = 0;

    public void reduce(Text key, Iterable<CountInformationTuple> values,
            Context context) throws IOException, InterruptedException {

        for (CountInformationTuple val : values) {
            login += val.getCountLogin();
            order += val.getCountOrder();
        }
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        result.setCountLogin(login);
        result.setCountOrder(order);

        context.write(new Text("total"), result);
    }
}

You get a single record as output with the total sum of login and order. You can modify the cleanup() method to compute the difference and other measures if needed.

Upvotes: 1

Related Questions