bcarthic
bcarthic

Reputation: 83

Read hive table from mapreduce

I am currently wrting a mapreduce program to find the difference between two hive tables. My hive table are partitioned on one or more columns. So teh folder name contains the value of partitioned columns.

Is there any way to read the hive partitioned table.

Can it be read in mapper ?

Upvotes: 2

Views: 4797

Answers (2)

lastoneisbearfood
lastoneisbearfood

Reputation: 4125

Yes, it can be read in Mapper pretty easily. This answer is based on the idea mentioned by @Daniel Koverman.

With the Context object passed to Mapper.map(), you can get the file split path this way

// this gives you the path plus offsets hdfs://.../tablename/partition1=20/partition2=ABC/000001_0:0+12345678
context.ctx.getInputSplit().toString();

// or this gets you the path only
((FileSplit)ctx.getInputSplit()).getPath();

Here's a more complete solution that parses out the actual partition value:

class MyMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

    // regex to parse out the /partitionName=partitionValue/ pairs
    private static Pattern partitionRegex = Pattern.compile("(?<=/)(?<name>[_\\-\\w]+)=(?<value>[^/]*)(?=/)");

    public static String parsePartitionValue(String path, String partitionName) throws IllegalArgumentException{
        Matcher m = partitionRegex.matcher(path);
        while(m.find()){
            if(m.group("name").equals(partitionName)){
                return m.group("value");
            }
        }
        throw new IllegalArgumentException(String.format("Partition [%s] not found", partitionName));
    }

    @Override
    public void map(KEYIN key, VALUEIN v, Context ctx) throws IOException, InterruptedException {
        String partitionVal = parsePartitionValue(ctx.getInputSplit().toString(), "my_partition_col");
   }
}

Upvotes: 0

Eswara Reddy Adapa
Eswara Reddy Adapa

Reputation: 995

Since the underlying HDFS data will be organised by default in a partitioned hive table as

 table/root/folder/x=1/y=1
 table/root/folder/x=1/y=2
 table/root/folder/x=2/y=1
 table/root/folder/x=2/y=2....,

You can build each of these input paths in the driver and add them through multiple calls to FileInputFormat.addInputPath(job, path).One call per folder path that you built.

Pasted sample code below.Note how paths are added to MyMapper.class.In this sample, I am using MultipleInputs API.Table is partitioned by 'part' and 'xdate'.

public class MyDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapred.compress.map.output", "true");
        conf.set("mapred.output.compression.type", "BLOCK"); 

        Job job = new Job(conf);
        //set up various job parameters
        job.setJarByClass(MyDriver.class);
        job.setJobName(conf.get("job.name"));
        MultipleInputs.addInputPath(job, new Path(conf.get("root.folder")+"/xdate="+conf.get("start.date")), TextInputFormat.class, OneMapper.class);
        for (Path path : getPathList(job,conf)) {
            System.out.println("path: "+path.toString());
            MultipleInputs.addInputPath(job, path, Class.forName(conf.get("input.format")).asSubclass(FileInputFormat.class).asSubclass(InputFormat.class), MyMapper.class);
        }
        ...
        ...
        return job.waitForCompletion(true) ? 0 : -2;

    }

    private static ArrayList<Path> getPathList(Job job, Configuration conf) {
        String rootdir = conf.get("input.path.rootfolder");
        String partlist = conf.get("part.list");
        String startdate_s = conf.get("start.date");
        String enxdate_s = conf.get("end.date");
        ArrayList<Path> pathlist = new ArrayList<Path>();
        String[] partlist_split = partlist.split(",");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Date startdate_d = null;
        Date enxdate_d = null;
        Path path = null;
        try {
            startdate_d = sdf.parse(startdate_s);
            enxdate_d = sdf.parse(enxdate_s);
            GregorianCalendar gcal = new GregorianCalendar();
            gcal.setTime(startdate_d);
            Date d = null;
            for (String part : partlist_split) {
                gcal.setTime(startdate_d);
                do {
                    d = gcal.getTime();
                    FileSystem fs = FileSystem.get(conf);
                    path = new Path(rootdir + "/part=" + part + "/xdate="
                            + sdf.format(d));
                    if (fs.exists(path)) {
                        pathlist.add(path);
                    }
                    gcal.add(Calendar.DAY_OF_YEAR, 1);
                } while (d.before(enxdate_d));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return pathlist;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MyDriver(), args);
        System.exit(res);
    }
}

Upvotes: 3

Related Questions