Dumbo The Elephant
Dumbo The Elephant

Reputation: 31

Hadoop mapreduce 2 files filtering?

I am required to print out the "Name" of those customers who does not have order numbers. I understand that I have to use a mapper method to instantiate the variables. I have to use 2 mappers as well since there are 2 input files. At the reduce phase, I have to filter out the customer without order.no. However, how do I filter out those customers that does not have order number?

File1.txt

 Cust.No. Name

 1        Adam
 2        Abe
 3        Alex
 4        Jones

File2.txt

    Order.Num.    Cust.No.     Price
    01            1            5422
    02            1            23
    03            2            1265
    04            3            127

What I did

Initially at the reducer method, I for looped the keys and checked if it matches with the existing key:

if (!(Data[0].equals("key")))
    {
        System.out.println(Data[1]);
    }

However, it prints every line.

Upvotes: 0

Views: 445

Answers (1)

Coursal
Coursal

Reputation: 1397

Seems like a regular reduce side join, so it may be somehow of a plain use case, however those kinds of computations tend to get really brutal on the workload side of things. This means that we must find ways to cut corners for us to make sure that the application is going to scale good enough for larger scales of input.

The most common way of saving time/space for the application's execution is either trying to design the potentially several MR jobs in a way where we can "cut" one or more jobs while keeping all of the functionality, or trying to minimize the number of (custom) mappers that are going to be implemented at the input data. The latter of the two is quite common for this kind of filtering you are trying to achieve, because we can easily use just one Map function which each instance of it will check the name of the file that it's currently reading to act accordingly.

More specifically, we can get the File1.txt and File2.txt file names before the mappers start to run through the setup function of the Map class, and use the name of the current file to be read to determine how to chop and store the data from the files into key-value pairs. For your problem, this Map function will output two types of key-value pairs:

  • <customer_ID, customer_name> (for data in File1.txt)

  • <customer_ID, order_ID> (for data in File2.txt)

Then the instances of the Reduce function will run for each customer (because customer IDs and names are unique, of course) and access the grouped values which are nothing more than a number of Text objects that either hold this customer's name or an order ID. We only want to output the customers without any orders on their records, so all we have to do is check if this list of values has a length of 1 (aka if there are no pair values for this customer other that one with his name).

To showcase this, I put both of the input files in a directory /input in HDFS (I used two-tab delimiters for the columns in File1.txt and three-tab delimiters for the columns of File2.txt. In case your files have different tabs or spaces between the column, you can change them accordingly):

File1.txt

Cust.No Name
1       Adam
2       Abe
3       Alex
4       Jones

File2.txt

Order.Num.  Cust.No.    Price
01          1           5422
02          1           23
03          2           1265
04          3           127

The program that does the filtering can look like this:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class OrderListFilter
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <customer_ID, customer_name> OR <customer_ID, order_ID>
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text>
    {
        private String current_filename = "";

        protected void setup(Context context)
        {
            // get the name of the current to-be-read file
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            current_filename = path.getName();
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            if(current_filename.equals("File1.txt"))    // if mapper is reading through the customer's file
            {
                if(value.toString().contains("Cust.No"))    // remove header
                    return;
                else
                {
                    String[] columns = value.toString().split("\t\t");  // 2 tabs as delimiter

                    // write customer ID as key and name as value
                    context.write(new Text(columns[0]), new Text(columns[1]));
                }
            }
            else if(current_filename.equals("File2.txt"))   // if mapper is reading through the order's file
            {
                if(value.toString().contains("Cust.No"))    // remove header
                    return;
                else
                {
                    String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter

                    // write customer ID as key and order num as value
                    context.write(new Text(columns[1]), new Text(columns[0]));
                }
            }
        }
    }

    /* input: <customer_ID, customer_name> OR <customer_ID, order_ID>
     * output: <customer_ID, customer_name>
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {
            List<String> customer_records = new ArrayList<String>();

            // put all the values in a list to find the size of them
            for(Text value : values)
                customer_records.add(value.toString());

            // if there's only one record, i.e. just the ID and the customer's name in they key-value pairs,
            // write their ID and name to output
            if(customer_records.size() == 1)
                context.write(key, new Text(customer_records.get(0)));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("output");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Order List Filter");
        job.setJarByClass(OrderListFilter.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input_dir);
        FileOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

And the output of it seems a-ok (ignore the warning in my setup): enter image description here

Upvotes: 1

Related Questions