Aliaxander
Aliaxander

Reputation: 2617

Create RDD based on a part of HBase rows

I'm trying to create RDD based on data from HBase table:

val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
  .map {
    case (key, row) => parse(key, row)
  }

parse is called for each table row not considering further manipulations with the data.

Is it possible to retrieve only rows with specific keys that match some condition (i.e. keys are in some specific range) in order to operate only on them?

Upvotes: 2

Views: 821

Answers (1)

MaxNevermind
MaxNevermind

Reputation: 2933

HBase is a key/value store with rows sorted by the key, which means that:

  • it's efficient at retrieving single rows by key or sequence of rows by key range
  • it's not efficient at retrieving random rows by some condition

All retrieving operations come down to two classes: Get and Scan. This is not hard to guess what they do, scan will iterate over all rows unless you specify stopRow/startRow. You also can set filters on Scan but it still has to iterate all the rows, filters just can lower network pressure because HBase possibly will have to return fewer rows.

TableInputFormat in your example uses Scan inside of it to access Hbase:

  public void setConf(Configuration configuration) {
    this.conf = configuration;

    Scan scan = null;

    if (conf.get(SCAN) != null) {
      try {
        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
      } catch (IOException e) {
        LOG.error("An error occurred.", e);
      }
    } else {
      try {
        scan = createScanFromConfiguration(conf);
      } catch (Exception e) {
          LOG.error(StringUtils.stringifyException(e));
      }
    }

    setScan(scan);
  }

Also createScanFromConfiguration method inside of TableInputFormat can give you a hint on how you can set filters and key ranges:

  public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
    Scan scan = new Scan();

    if (conf.get(SCAN_ROW_START) != null) {
      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
    }

    if (conf.get(SCAN_ROW_STOP) != null) {
      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
    }

    if (conf.get(SCAN_COLUMNS) != null) {
      addColumns(scan, conf.get(SCAN_COLUMNS));
    }

    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
    }

    if (conf.get(SCAN_TIMESTAMP) != null) {
      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
    }

    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
      scan.setTimeRange(
          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
    }

    if (conf.get(SCAN_MAXVERSIONS) != null) {
      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
    }

    if (conf.get(SCAN_CACHEDROWS) != null) {
      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
    }

    if (conf.get(SCAN_BATCHSIZE) != null) {
      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
    }

    // false by default, full table scans generate too much BC churn
    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));

    return scan;
  }

This stackoverflow answer provide an example of how to set a Scan on hbaseConfig, notice though you don't have to set Scan, you may just set specific properties like SCAN_ROW_START and other from createScanFromConfiguration I referred above.

Upvotes: 1

Related Questions