bit
bit

Reputation: 437

Connection timeout on Elasticsearch parallel_bulk

I am trying to import a large amount of data using Elasticsearch parallel_bulk. This is my Index structure:

{
    "_index" : "myindex",
    "_type" : domain,
    "_id" : md5(email),
    "_score" : 1.0,
    "_source" : {
      "purchase_date" : purchase_date,
      "amount" : amount,
    }
}

And this is my python code:

def insert(input_file):
    paramL = []

    with open(input_file) as f:
        for line in f:
            line = line.rstrip()

            fields = line.split(',')
            purchase_date = fields[0]
            amount = fields[1]
            email = fields[2]               

            id_email = getMD5(email)

            doc = {
                "email": email,
                "purchase_date": purchase_date,
                "amount": amount _date
            }

            ogg = {
                '_op_type': 'index',
                '_index': index_param,
                '_type': doctype_param,
                '_id': id_email,
                '_source': doc
            }

            paramL.append(ogg)    

            if len(paramL) > 500000:
                for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
                    if not success:
                        print "Insert failed: ", info

                # empty paramL if size > 5.000.000
                del paramL[:]

The file contains 42.644.394 rows and I thinked to insert data each time the list "paramL" is about 5.000.000 elements. So, when I run the script, it inserts about 436.226 values until it crashes with the following error:

Traceback (most recent call last): File "test-2-0.py", line 133, in main() File "test-2-0.py", line 131, in main insert(args.file) File "test-2-0.py", line 82, in insert for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py", line 306, in parallel_bulk _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) File "/usr/lib/python2.7/multiprocessing/pool.py", line 668, in next raise value elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'127.0.0.1', port=9200): Read timed out. (read timeout=10))

I also tried to increase timeout passing it in Elasticsearch constructor

es = Elasticsearch(['127.0.0.1'], request_timeout=30)

but the result is the same.

Upvotes: 0

Views: 5544

Answers (1)

Lupanoide
Lupanoide

Reputation: 3212

Sincerly I never do a bulk import with so many docs to indicize. I don't know why this error appears. In your case I suggest to not create a list -paramL - but to manage your data with a generator function - as described as best pratice for large bulk ingest in the elastic forum by an elastic developer here: https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3 . Something like this:

def insert(input_file):

    with open(input_file) as f:
        for line in f:
            line = line.rstrip()

            fields = line.split(',')
            purchase_date = fields[0]
            amount = fields[1]
            email = fields[2]               

            id_email = getMD5(email)

            doc = {
                "email": email,
                "purchase_attack": purchase_date,
                "amount _relevation": amount _date
            }

            yield {
                '_op_type': 'index',
                '_index': index_param,
                '_type': doctype_param,
                '_id': id_email,
                '_source': doc
            }



for success, info in helpers.parallel_bulk(client=es, actions=insert(input_file), thread_count=4):
    if not success:
        print "Insert failed: ", info

You can increase the space dedicated to elastic in java virtual machine editing this file /etc/elasticsearch/jvm.options To allocate 2 GB of RAM, you should change - if your machine has 4 GB, you should keep almost 1 GB to the system, so you could allocate max 3 gb:

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

 -Xms2g
 -Xmx2g

Then you have to restart the service

sudo service elasticsearch restart

And try again. Good luck

Upvotes: 1

Related Questions