Reputation: 161
I need to implement a MR job which access data from both HBase table and HDFS files. E.g., mapper reads data from HBase table and from HDFS files, these data share the same primary key but have different schema. A reducer then join all columns (from HBase table and HDFS files) together.
I tried look online and could not find a way to run MR job with such mixed data source. MultipleInputs seem only work for multiple HDFS data sources. Please let me know if you have some ideas. Sample code would be great.
Upvotes: 3
Views: 4163
Reputation: 161
After a few days of investigation (and get help from HBase user mailing list), I finally figured out how to do it. Here is the source code:
public class MixMR {
public static class Map extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] sa = s.split(",");
if (sa.length == 2) {
context.write(new Text(sa[0]), new Text(sa[1]));
}
}
}
public static class TableMap extends TableMapper<Text, Text> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "c1".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String key = Bytes.toString(row.get());
String val = new String(value.getValue(CF, ATTR1));
context.write(new Text(key), new Text(val));
}
}
public static class Reduce extends Reducer <Object, Text, Object, Text> {
public void reduce(Object key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String ks = key.toString();
for (Text val : values){
context.write(new Text(ks), val);
}
}
}
public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
Path inputPath2 = new Path(args[1]);
Path outputPath = new Path(args[2]);
String tableName = "test";
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MixMR.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
TableMap.class, // mapper
Text.class, // mapper output key
Text.class, // mapper output value
job);
job.setReducerClass(Reduce.class); // reducer class
job.setOutputFormatClass(TextOutputFormat.class);
// inputPath1 here has no effect for HBase table
MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
MultipleInputs.addInputPath(job, inputPath2, TableInputFormat.class, TableMap.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
Upvotes: 8
Reputation: 25909
A pig script or hive query can do that easily.
sample pig script
tbl = LOAD 'hbase://SampleTable'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'info:* ...', '-loadKey true -limit 5')
AS (id:bytearray, info_map:map[],...);
fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);
Joined = JOIN A tbl by id,fle by id;
STORE Joined to ...
Upvotes: 0
Reputation: 34184
There is no OOTB feature that supports this. A possible workaround could be to Scan your HBase table and write the Results to a HDFS file first and then do the reduce-side join using MultipleInputs. But this will incur some additional I/O overhead.
Upvotes: 0