Reputation: 259
I am trying to insert data into elastic search from a hive table.
CREATE EXTERNAL TABLE IF NOT EXISTS es_temp_table (
dt STRING,
text STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource'='aggr_2014-10-01/metric','es.index.auto.create'='true')
;
INSERT OVERWRITE TABLE es_temp_table
SELECT dt, description
FROM other_table
However, the data is off. When I do a count(*) on my other table I am getting 6,000 rows. When I search the aggr_2014-10-01 index, I see 10,000 records! Somehow, the records are being duplicated (rows are being copied over multiple times). Maybe I can remove duplicate records inside of elastic search? Not sure how I would do that though.
I believe it might be a result of Hive/Qubole spawning two tasks for every mapping. If one mapper succeeds, it tries to kill the other. However, the other task already did damage (aka inserted into ElasticSearch). This is my best guess, but I would prefer to know the exact reason and if there is a way for me to fix it.
set mapred.map.tasks.speculative.execution=false;
One thing I found was to set speculative execution to false, so that only one task is spawned per mapper (see above setting). However, now I am seeing undercounting. I believe this may be due to records being skipped, but I am unable to diagnose why those records would be skipped in the first place.
In this version, it also means that if even one task/mapper fails, the entire job fails, and then I need to delete the index (partial data was uploaded) and rerun the entire job (which takes ~4hours).
[PROGRESS UPDATE]
I attempted to solve this by putting all of the work in the reducer (it's the only way to only spawn one task to ensure no duplicate record insertions).
INSERT OVERWRITE TABLE es_temp_table
SELECT dt, description
FROM other_table
DISTRIBUTE BY cast(rand()*250 as int);
However, I now see a huge underestimate! 2,000 records only now. Elastic search does estimate some things, but not to this extent. There are simply less records in ElasticSearch. This may be due to failed tasks (that are no longer retrying). It may be from when Qubole/Hive passes over malformed entries. But I set:
set mapreduce.map.skip.maxrecords=1000;
Here are some other settings for my query:
set es.nodes=node-names
set es.port=9200;
set es.bulk.size.bytes=1000mb;
set es.http.timeout=20m;
set mapred.tasktracker.expiry.interval=3600000;
set mapred.task.timeout=3600000;
Upvotes: 3
Views: 976
Reputation: 259
I determined the problem. As I suspected, insertion was skipping over some records that were considered "bad." I was never able to find what records exactly were being skipped, but I tried replacing all non-alphanumeric characters with a space. This solved the problem! The records are no longer being skipped, and all data is uploaded to Elastic Search.
INSERT OVERWRITE TABLE es_temp_table
SELECT dt, REGEXP_REPLACE(description, '[^0-9a-zA-Z]+', ' ')
FROM other_table
Upvotes: 1