Reputation: 2617
I'm trying to create RDD
based on data from HBase
table:
val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map {
case (key, row) => parse(key, row)
}
parse
is called for each table row not considering further manipulations with the data.
Is it possible to retrieve only rows with specific keys that match some condition (i.e. keys are in some specific range) in order to operate only on them?
Upvotes: 2
Views: 821
Reputation: 2933
HBase is a key/value store with rows sorted by the key, which means that:
All retrieving operations come down to two classes: Get and Scan. This is not hard to guess what they do, scan will iterate over all rows unless you specify stopRow/startRow. You also can set filters on Scan but it still has to iterate all the rows, filters just can lower network pressure because HBase possibly will have to return fewer rows.
TableInputFormat in your example uses Scan inside of it to access Hbase:
public void setConf(Configuration configuration) {
this.conf = configuration;
Scan scan = null;
if (conf.get(SCAN) != null) {
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
} catch (IOException e) {
LOG.error("An error occurred.", e);
}
} else {
try {
scan = createScanFromConfiguration(conf);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
}
setScan(scan);
}
Also createScanFromConfiguration method inside of TableInputFormat can give you a hint on how you can set filters and key ranges:
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
Scan scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
}
if (conf.get(SCAN_ROW_STOP) != null) {
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
}
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
if (conf.get(SCAN_MAXVERSIONS) != null) {
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
}
if (conf.get(SCAN_CACHEDROWS) != null) {
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
}
if (conf.get(SCAN_BATCHSIZE) != null) {
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
}
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
return scan;
}
This stackoverflow answer provide an example of how to set a Scan on hbaseConfig
, notice though you don't have to set Scan, you may just set specific properties like SCAN_ROW_START and other from createScanFromConfiguration
I referred above.
Upvotes: 1