Reputation: 437
I am trying to import a large amount of data using Elasticsearch parallel_bulk. This is my Index structure:
{
"_index" : "myindex",
"_type" : domain,
"_id" : md5(email),
"_score" : 1.0,
"_source" : {
"purchase_date" : purchase_date,
"amount" : amount,
}
}
And this is my python code:
def insert(input_file):
paramL = []
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_date": purchase_date,
"amount": amount _date
}
ogg = {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
paramL.append(ogg)
if len(paramL) > 500000:
for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
if not success:
print "Insert failed: ", info
# empty paramL if size > 5.000.000
del paramL[:]
The file contains 42.644.394 rows and I thinked to insert data each time the list "paramL" is about 5.000.000 elements. So, when I run the script, it inserts about 436.226 values until it crashes with the following error:
Traceback (most recent call last): File "test-2-0.py", line 133, in main() File "test-2-0.py", line 131, in main insert(args.file) File "test-2-0.py", line 82, in insert for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py", line 306, in parallel_bulk _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) File "/usr/lib/python2.7/multiprocessing/pool.py", line 668, in next raise value elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'127.0.0.1', port=9200): Read timed out. (read timeout=10))
I also tried to increase timeout passing it in Elasticsearch constructor
es = Elasticsearch(['127.0.0.1'], request_timeout=30)
but the result is the same.
Upvotes: 0
Views: 5544
Reputation: 3212
Sincerly I never do a bulk import with so many docs to indicize. I don't know why this error appears. In your case I suggest to not create a list -paramL - but to manage your data with a generator function - as described as best pratice for large bulk ingest in the elastic forum by an elastic developer here: https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3 . Something like this:
def insert(input_file):
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_attack": purchase_date,
"amount _relevation": amount _date
}
yield {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
for success, info in helpers.parallel_bulk(client=es, actions=insert(input_file), thread_count=4):
if not success:
print "Insert failed: ", info
You can increase the space dedicated to elastic in java virtual machine editing this file /etc/elasticsearch/jvm.options
To allocate 2 GB of RAM, you should change - if your machine has 4 GB, you should keep almost 1 GB to the system, so you could allocate max 3 gb:
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms2g
-Xmx2g
Then you have to restart the service
sudo service elasticsearch restart
And try again. Good luck
Upvotes: 1