Reputation: 20956
My data keys are stored in format trade<date><index>
trade1907030001
trade1907030002
trade1907040001
trade1907040002
trade1907050001
trade1907050002
What is proper way to implement 'reverse' scan to iterate over all trades for the day or from specific row down to the end of the day or even between two exact trades?
Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));
scan.setStopRow(Bytes.toBytes(trade + day));
Having in mind that according to documentatin start row is inclusive and end row is exclusive, we'll miss oldest trade of the day. If the row is actually the trade row trade we must not increment the key, otherwise next trade will be picked up. It started to be conditional. How could I make it work reliable for different situations?
Upvotes: 1
Views: 3089
Reputation: 20956
This is how scan actually works (tested in hbase shell v1.2.0-cdh5.13.3):
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171018B00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171113B00001 column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
trade171114S00001 column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171018B00001', ENDROW=>'trade171113B00001'}
ROW COLUMN+CELL
trade171018B00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171113B00001', ENDROW=>'trade171018B00001', REVERSED=>true}
ROW COLUMN+CELL
trade171113B00001 column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171018', ENDROW=>'trade171113'}
ROW COLUMN+CELL
trade171018B00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], STARTROW=>'trade171113', ENDROW=>'trade171018', REVERSED=>true}
ROW COLUMN+CELL
trade171020S00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
trade171018B00001 column=inp:data_as_of_date, timestamp=1511793438335, value=20171020
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], ROWPREFIXFILTER=>'trade171113'}
ROW COLUMN+CELL
trade171113B00001 column=inp:data_as_of_date, timestamp=1511993729979, value=20171114
scan 'namespace:table', {COLUMNS=>['inp:data_as_of_date'], ROWPREFIXFILTER=>'trade171113', REVERSED=>true}
ROW COLUMN+CELL
0 row(s) in 0.2300 seconds
If start row and end row is shorter then table row keys, following will work as expected
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(trade + day));
scan.setStopRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));
Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(Bytes.unsignedCopyAndIncrement(Bytes.toBytes(trade + day)));
scan.setStopRow(Bytes.toBytes(trade + day));
If start row and end row could be same length as table row keys, following will work as expected
Scan scan = new Scan();
scan.setStartRow(createKey("S", productSymbolId, YYMMDD.print(fromDate)));
scan.setStopRow(createNextKey("S", productSymbolId, YYMMDD.print(toDate)));
Scan scan = new Scan();
scan.setReversed(true);
scan.setStartRow(createKeyBeforeNext("A", stripSpaces(accountId), YYMMDD.print(toDate)));
scan.setStopRow(createKeyBefore("A", stripSpaces(accountId), YYMMDD.print(fromDate)));
where
key === 54686973697361746573746b6579
next === 54686973697361746573746b657a
before === 54686973697361746573746b6578ffffffffffffffffff
beforeNext === 54686973697361746573746b6579ffffffffffffffffff
implementation
/**
* <h4>usage</h4>
*
* <pre>
* Scan scan = new Scan();
* scan.setStartRow(createKey("S", productSymbolId, YYMMDD.print(fromDate)));
* scan.setStopRow(createNextKey("S", productSymbolId, YYMMDD.print(toDate)));
*
* Scan scan = new Scan();
* scan.setReversed(true);
* scan.setStartRow(createKeyBeforeNext("A", stripSpaces(accountId), YYMMDD.print(toDate)));
* scan.setStopRow(createKeyBefore("A", stripSpaces(accountId), YYMMDD.print(fromDate)));
* </pre>
*
* <h4>spec</h4>
*
* <pre>
* key === 54686973697361746573746b6579
* next === 54686973697361746573746b657a
* before === 54686973697361746573746b6578ffffffffffffffffff
* beforeNext === 54686973697361746573746b6579ffffffffffffffffff
* </pre>
*
* @see #createKeyBefore(String...)
* @see #createKeyBeforeNext(String...)
* @see #createNextKey(String...)
*/
// similar to Bytes.add(final byte [] a, final byte [] b, final byte [] c) {
public static byte[] createKey(String... parts) {
byte[][] bytes = new byte[parts.length][];
int size = 0;
for (int i = 0; i < parts.length; i++) {
bytes[i] = toBytes(parts[i]);
size += bytes[i].length;
}
byte[] result = new byte[size];
for (int i = 0, j = 0; i < bytes.length; i++) {
arraycopy(bytes[i], 0, result, j, bytes[i].length);
j += bytes[i].length;
}
return result;
}
/**
* Create the next row
*
* <pre>
* key === 54686973697361746573746b6579
* next === 54686973697361746573746b657a
* </pre>
*
* @see #createKey(String...)
*/
public static byte[] createNextKey(String... parts) {
return unsignedCopyAndIncrement(createKey(parts));
}
/**
* Create the closest row before
*
* <pre>
* key === 54686973697361746573746b6579
* before === 54686973697361746573746b6578ffffffffffffffffff
* </pre>
*
* @see #createKey(String...)
*/
public static byte[] createKeyBefore(String... parts) {
return createClosestRowBefore(createKey(parts));
}
/**
* Create the closest row before the next row
*
* <pre>
* key === 54686973697361746573746b6579
* beforeNext === 54686973697361746573746b6579ffffffffffffffffff
* </pre>
*
* @see #createKey(String...)
*/
public static byte[] createKeyBeforeNext(String... parts) {
return createClosestRowBefore(createNextKey(parts));
}
// from hbase sources ClientScanner.createClosestRowBefore(byte[] row)
private static byte[] createClosestRowBefore(byte[] row) {
if (row == null)
throw new IllegalArgumentException("The passed row is empty");
if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY))
return MAX_BYTE_ARRAY;
if (row[row.length - 1] == 0)
return Arrays.copyOf(row, row.length - 1);
byte[] closestFrontRow = Arrays.copyOf(row, row.length);
closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
return closestFrontRow;
}
Upvotes: 1
Reputation: 5521
You can use:
Scan scan = new Scan();
scan.setReversed(true);
scan.setRowPrefixFilter(Bytes.toBytes(trade + day));
which automatically takes cares of ensuring the first and last trades aren't ignored.
Source: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html#setRowPrefixFilter-byte:A-
Upvotes: 1