Reputation: 561
I'm currently using Python to bulk load CSV data into an HBase table, and I'm currently having trouble with writing the appropriate HFiles using saveAsNewAPIHadoopFile
My code currently looks as follows:
def csv_to_key_value(row):
cols = row.split(",")
result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
(cols[0], [cols[0], "f2", "c2", cols[2]]),
(cols[0], [cols[0], "f3", "c3", cols[3]]))
return result
def bulk_load(rdd):
conf = {#Ommitted to simplify}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
.flatMap(csv_to_key_value)
if not load_rdd.isEmpty():
load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
"org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
else:
print("Nothing to process")
When I run this code, I get the following error:
java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
Since the error indicates that the key is the problem, I grabbed the elements from my RDD and they are as follows (formatted for readability)
[(u'1', [u'1', 'f1', 'c1', u'A']),
(u'1', [u'1', 'f2', 'c2', u'1A']),
(u'1', [u'1', 'f3', 'c3', u'10']),
(u'2', [u'2', 'f1', 'c1', u'B']),
(u'2', [u'2', 'f2', 'c2', u'2B']),
(u'2', [u'2', 'f3', 'c3', u'9']),
. . .
(u'9', [u'9', 'f1', 'c1', u'I']),
(u'9', [u'9', 'f2', 'c2', u'3C']),
(u'9', [u'9', 'f3', 'c3', u'2']),
(u'10', [u'10', 'f1', 'c1', u'J']),
(u'10', [u'10', 'f2', 'c2', u'1A']),
(u'10', [u'10', 'f3', 'c3', u'1'])]
This is a perfect match for my CSV, in the correct order. As far as I understand, in HBase a key is defined by {row, family, timestamp}. Row and family are combination are unique and monotonically increasing for all entries in my data, and I have no control of the timestamp (which is the only problem I can imagine)
Can anybody advise me on how to avoid/prevent such problems?
Upvotes: 1
Views: 1388
Reputation: 561
Well this was just a silly error on my part, and I feel a bit foolish. Lexicographically, the order should be 1, 10, 2, 3... 8, 9. Easiest way to guarantee correct ordering before loading is:
rdd.sortByKey(true);
I hope I can save at least one person the headaches I had.
Upvotes: 2