Reputation: 3323
After using my mapreduce job this is the output:
User16565 Logins: 1 Orders:1
User16566 Logins: 2 Orders:2
User16567 Logins: 1 Orders:1
Everything looks great, but when the log-file has thousands of entries it is not very helpful. Is there a way to change my code to sum up the "Logins" and "Orders" so I can calculate the difference?
Edit: New Question/Problem
Log Example:
2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-
I found an error in my code. I realized that the Logins & Orders aren't count correctly. At first it seemed that the output is correct but when i checked the logins & orders manually i realized that there is an error. Output:
User73511 Logins: 3 Orders:2
User14253 Logins: 1 Orders:1
Should be:
User73511 Logins: 1 Orders:2
User14253 Logins: 1 Orders:0
Here is the whole code:
public class UserOrderCount {
public static class SingleUserMapper extends
Mapper<LongWritable, Text, Text, CountInformationTuple> {
private Text outUserId = new Text();
private CountInformationTuple outCountOrder = new CountInformationTuple();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] singleUserData = tempString.split(",");
String userId = singleUserData[3];
String featureId = singleUserData[1];
if (featureId.contains("feature:order-created")) {
outCountOrder.setCountOrder(1);
}
if (featureId.contains("feature:login")) {
outCountOrder.setCountLogin(1);
}
outUserId.set(userId);
context.write(outUserId, outCountOrder);
}
}
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
int login = 0;
int order = 0;
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
result.setCountLogin(login);
result.setCountOrder(order);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: UserOrderCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf);
job.setJobName("UserOrderCount");
job.setJarByClass(UserOrderCount.class);
job.setMapperClass(SingleUserMapper.class);
job.setCombinerClass(SingleUserReducer.class);
job.setReducerClass(SingleUserReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountInformationTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class CountInformationTuple implements Writable {
private int countOrder = 0;
private int countLogin = 0;
public int getCountOrder() {
return countOrder;
}
public void setCountOrder(int order) {
this.countOrder = order;
}
public int getCountLogin() {
return countLogin;
}
public void setCountLogin(int login) {
this.countLogin = login;
}
@Override
public void readFields(DataInput in) throws IOException {
countOrder = in.readInt();
countLogin = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(countLogin);
out.writeInt(countOrder);
}
@Override
public String toString() {
return "Logins: "+ countLogin + "\t" + "Orders:" + countOrder;
}
}
}
Upvotes: 1
Views: 8877
Reputation: 3323
For the one interessted: Solved my "wrong-output"-error.
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] stringData = tempString.split(",");
String userID = stringData[3];
String featureID = stringData[1];
int login = 0;
int order = 0;
if (featureID.matches("feature:login")) {
login++;
} else if (featureID.matches("feature:order-created")) {
order++;
}
outUserID.set(userID);
outUserCount.set(login, order);
context.write(outUserID, outUserCount);
}
public static class UserCountTuple implements Writable {
private IntWritable countLogin;
private IntWritable countOrder;
public UserCountTuple() {
set(new IntWritable(0), new IntWritable(0));
}
public void set(int countLogin, int countOrder) {
this.countLogin.set(countLogin);
this.countOrder.set(countOrder);
}
public void set(IntWritable countLogin, IntWritable countOrder) {
this.countLogin = countLogin;
this.countOrder = countOrder;
}
@Override
public void readFields(DataInput in) throws IOException {
countLogin.readFields(in);
countOrder.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
countLogin.write(out);
countOrder.write(out);
}
public IntWritable getLogin() {
return countLogin;
}
public IntWritable getOrder() {
return countOrder;
}
@Override
public String toString() {
return "Logins: " + countLogin + "\t" + "Orders:" + countOrder;
}
}
Upvotes: 3
Reputation: 4110
As you want to have a single file as the result you could configure your MapReduce job using jobConf.setNumReduceTasks(1)
to use a single reduce task only, see JobConf JavaDoc for more information.
Now your one and only reduce task gets the all login
and order
counts for every user. You can just sum all the login
and order
values of the processed records in your reduce task and output the summed value in the cleanup() method, which is called only once after all input records to the single reduce task are processed. Example code:
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
private int login = 0;
private int order = 0;
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
result.setCountLogin(login);
result.setCountOrder(order);
context.write(new Text("total"), result);
}
}
You get a single record as output with the total sum of login
and order
. You can modify the cleanup()
method to compute the difference and other measures if needed.
Upvotes: 1