Reputation: 83
I am currently wrting a mapreduce program to find the difference between two hive tables. My hive table are partitioned on one or more columns. So teh folder name contains the value of partitioned columns.
Is there any way to read the hive partitioned table.
Can it be read in mapper ?
Upvotes: 2
Views: 4797
Reputation: 4125
Yes, it can be read in Mapper pretty easily. This answer is based on the idea mentioned by @Daniel Koverman.
With the Context object passed to Mapper.map(), you can get the file split path this way
// this gives you the path plus offsets hdfs://.../tablename/partition1=20/partition2=ABC/000001_0:0+12345678
context.ctx.getInputSplit().toString();
// or this gets you the path only
((FileSplit)ctx.getInputSplit()).getPath();
Here's a more complete solution that parses out the actual partition value:
class MyMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// regex to parse out the /partitionName=partitionValue/ pairs
private static Pattern partitionRegex = Pattern.compile("(?<=/)(?<name>[_\\-\\w]+)=(?<value>[^/]*)(?=/)");
public static String parsePartitionValue(String path, String partitionName) throws IllegalArgumentException{
Matcher m = partitionRegex.matcher(path);
while(m.find()){
if(m.group("name").equals(partitionName)){
return m.group("value");
}
}
throw new IllegalArgumentException(String.format("Partition [%s] not found", partitionName));
}
@Override
public void map(KEYIN key, VALUEIN v, Context ctx) throws IOException, InterruptedException {
String partitionVal = parsePartitionValue(ctx.getInputSplit().toString(), "my_partition_col");
}
}
Upvotes: 0
Reputation: 995
Since the underlying HDFS data will be organised by default in a partitioned hive table as
table/root/folder/x=1/y=1
table/root/folder/x=1/y=2
table/root/folder/x=2/y=1
table/root/folder/x=2/y=2....,
You can build each of these input paths in the driver and add them through multiple calls to FileInputFormat.addInputPath(job, path).One call per folder path that you built.
Pasted sample code below.Note how paths are added to MyMapper.class.In this sample, I am using MultipleInputs API.Table is partitioned by 'part' and 'xdate'.
public class MyDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
Job job = new Job(conf);
//set up various job parameters
job.setJarByClass(MyDriver.class);
job.setJobName(conf.get("job.name"));
MultipleInputs.addInputPath(job, new Path(conf.get("root.folder")+"/xdate="+conf.get("start.date")), TextInputFormat.class, OneMapper.class);
for (Path path : getPathList(job,conf)) {
System.out.println("path: "+path.toString());
MultipleInputs.addInputPath(job, path, Class.forName(conf.get("input.format")).asSubclass(FileInputFormat.class).asSubclass(InputFormat.class), MyMapper.class);
}
...
...
return job.waitForCompletion(true) ? 0 : -2;
}
private static ArrayList<Path> getPathList(Job job, Configuration conf) {
String rootdir = conf.get("input.path.rootfolder");
String partlist = conf.get("part.list");
String startdate_s = conf.get("start.date");
String enxdate_s = conf.get("end.date");
ArrayList<Path> pathlist = new ArrayList<Path>();
String[] partlist_split = partlist.split(",");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date startdate_d = null;
Date enxdate_d = null;
Path path = null;
try {
startdate_d = sdf.parse(startdate_s);
enxdate_d = sdf.parse(enxdate_s);
GregorianCalendar gcal = new GregorianCalendar();
gcal.setTime(startdate_d);
Date d = null;
for (String part : partlist_split) {
gcal.setTime(startdate_d);
do {
d = gcal.getTime();
FileSystem fs = FileSystem.get(conf);
path = new Path(rootdir + "/part=" + part + "/xdate="
+ sdf.format(d));
if (fs.exists(path)) {
pathlist.add(path);
}
gcal.add(Calendar.DAY_OF_YEAR, 1);
} while (d.before(enxdate_d));
}
} catch (Exception e) {
e.printStackTrace();
}
return pathlist;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyDriver(), args);
System.exit(res);
}
}
Upvotes: 3