Mykhaylo Adamovych
Mykhaylo Adamovych

Reputation: 20956

Hbase reverse scan

My data keys are stored in format trade<date><index>

trade1907030001
trade1907030002
trade1907040001
trade1907040002
trade1907050001
trade1907050002

What is proper way to implement 'reverse' scan to iterate over all trades for the day or from specific row down to the end of the day or even between two exact trades?

Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));
scan.setStopRow(Bytes.toBytes(trade + day));

Having in mind that according to documentatin start row is inclusive and end row is exclusive, we'll miss oldest trade of the day. If the row is actually the trade row trade we must not increment the key, otherwise next trade will be picked up. It started to be conditional. How could I make it work reliable for different situations?

Upvotes: 1

Views: 3089

Answers (2)

Mykhaylo Adamovych
Mykhaylo Adamovych

Reputation: 20956

This is how scan actually works (tested in hbase shell v1.2.0-cdh5.13.3):

trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171018B00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171113B00001                                          column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
trade171114S00001                                          column=inp:data_as_of_date, timestamp=1511993729979, value=20171114

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171018B00001', ENDROW=>'trade171113B00001'}
ROW                                                                  COLUMN+CELL
trade171018B00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171113B00001', ENDROW=>'trade171018B00001', REVERSED=>true}
ROW                                                                  COLUMN+CELL
trade171113B00001                                          column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171018', ENDROW=>'trade171113'}
ROW                                                                  COLUMN+CELL
trade171018B00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171113', ENDROW=>'trade171018', REVERSED=>true}
ROW                                                                  COLUMN+CELL
trade171020S00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171018B00001                                          column=inp:data_as_of_date, timestamp=1511793438335, value=20171020

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], ROWPREFIXFILTER=>'trade171113'}
ROW                                                                  COLUMN+CELL
trade171113B00001                                          column=inp:data_as_of_date, timestamp=1511993729979, value=20171114

scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], ROWPREFIXFILTER=>'trade171113', REVERSED=>true}
ROW                                                                  COLUMN+CELL
0 row(s) in 0.2300 seconds

If start row and end row is shorter then table row keys, following will work as expected

Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(trade + day));
scan.setStopRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));

Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));
scan.setStopRow(Bytes.toBytes(trade + day));

If start row and end row could be same length as table row keys, following will work as expected

Scan scan = new Scan();
scan.setStartRow(createKey("S", productSymbolId, YYMMDD.print(fromDate)));
scan.setStopRow(createNextKey("S", productSymbolId, YYMMDD.print(toDate)));

Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(createKeyBeforeNext("A", stripSpaces(accountId), YYMMDD.print(toDate)));
scan.setStopRow(createKeyBefore("A", stripSpaces(accountId), YYMMDD.print(fromDate)));

where

key === 54686973697361746573746b6579
next === 54686973697361746573746b657a
before === 54686973697361746573746b6578ffffffffffffffffff
beforeNext === 54686973697361746573746b6579ffffffffffffffffff

implementation

/**
 * <h4>usage</h4>
 * 
 * <pre>
 * Scan scan = new Scan();
 * scan.setStartRow(createKey("S", productSymbolId, YYMMDD.print(fromDate)));
 * scan.setStopRow(createNextKey("S", productSymbolId, YYMMDD.print(toDate)));
 *
 * Scan scan = new Scan();
 * scan.setReversed(true);
 * scan.setStartRow(createKeyBeforeNext("A", stripSpaces(accountId), YYMMDD.print(toDate)));
 * scan.setStopRow(createKeyBefore("A", stripSpaces(accountId), YYMMDD.print(fromDate)));
 * </pre>
 * 
 * <h4>spec</h4>
 * 
 * <pre>
 * key === 54686973697361746573746b6579
 * next === 54686973697361746573746b657a
 * before === 54686973697361746573746b6578ffffffffffffffffff
 * beforeNext === 54686973697361746573746b6579ffffffffffffffffff
 * </pre>
 * 
 * @see #createKeyBefore(String...)
 * @see #createKeyBeforeNext(String...)
 * @see #createNextKey(String...)
 */
// similar to Bytes.add(final byte [] a, final byte [] b, final byte [] c) {
public static byte[] createKey(String... parts) {
    byte[][] bytes = new byte[parts.length][];
    int size = 0;
    for (int i = 0; i < parts.length; i++) {
        bytes[i] = toBytes(parts[i]);
        size += bytes[i].length;
    }
    byte[] result = new byte[size];
    for (int i = 0, j = 0; i < bytes.length; i++) {
        arraycopy(bytes[i], 0, result, j, bytes[i].length);
        j += bytes[i].length;
    }
    return result;
}

/**
 * Create the next row
 * 
 * <pre>
 * key === 54686973697361746573746b6579
 * next === 54686973697361746573746b657a
 * </pre>
 * 
 * @see #createKey(String...)
 */
public static byte[] createNextKey(String... parts) {
    return unsignedCopyAndIncrement(createKey(parts));
}

/**
 * Create the closest row before
 * 
 * <pre>
 * key === 54686973697361746573746b6579
 * before === 54686973697361746573746b6578ffffffffffffffffff
 * </pre>
 * 
 * @see #createKey(String...)
 */
public static byte[] createKeyBefore(String... parts) {
    return createClosestRowBefore(createKey(parts));
}

/**
 * Create the closest row before the next row
 * 
 * <pre>
 * key === 54686973697361746573746b6579
 * beforeNext === 54686973697361746573746b6579ffffffffffffffffff
 * </pre>
 * 
 * @see #createKey(String...)
 */
public static byte[] createKeyBeforeNext(String... parts) {
    return createClosestRowBefore(createNextKey(parts));
}

// from hbase sources ClientScanner.createClosestRowBefore(byte[] row)
private static byte[] createClosestRowBefore(byte[] row) {
    if (row == null)
        throw new IllegalArgumentException("The passed row is empty");
    if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY))
        return MAX_BYTE_ARRAY;
    if (row[row.length - 1] == 0)
        return Arrays.copyOf(row, row.length - 1);
    byte[] closestFrontRow = Arrays.copyOf(row, row.length);
    closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
    closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
    return closestFrontRow;
}

Upvotes: 1

Ben Watson
Ben Watson

Reputation: 5521

You can use:

Scan scan = new Scan();
scan.setReversed(true);
scan.setRowPrefixFilter(Bytes.toBytes(trade + day));

which automatically takes cares of ensuring the first and last trades aren't ignored.

Source: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html#setRowPrefixFilter-byte:A-

Upvotes: 1

Related Questions