Omid Goudazi
Omid Goudazi

Reputation: 190

improve python elasticsearch performance

i am new in python and Elasticsearch i write a python code that read data from very large json file and index some attributes in Elasricsearch.

import elasticsearch
import json
es = elasticsearch.Elasticsearch()  # use default of localhost, port 9200
with open('j.json') as f:
    n=0
    for line in f:
        try:
            j_content = json.loads(line)
            event_type = j_content['6000000']
            device_id = j_content['6500048']
            raw_event_msg= j_content['6000012']
            event_id = j_content["0"]
            body = {
                '6000000': str(event_type),
                '6500048': str(device_id),
                '6000012': str(raw_event_msg),
                '6000014': str(event_id),
            }
            n=n+1
            es.index(index='coredb', doc_type='json_data', body=body)
        except:
            pass

but it's too slow and i have many free hardware resources. how can i improve performance of code with multi thread or bulk ?

Upvotes: 4

Views: 3377

Answers (2)

Raydel Miranda
Raydel Miranda

Reputation: 14360

What you want it is called Cython ;)

You can speed up your code up to 20x for sure only enabling static typing to your variables.

The code bellow should go into cython, give it a try, you'll see:

try:
    j_content = json.loads(line)       # Here you might want to work with cython structs.
                                       # I can see you have a json per line, so it should be easy
    event_type = j_content['6000000']
    device_id = j_content['6500048']
    raw_event_msg= j_content['6000012']
    event_id = j_content["0"]
    body = {
        '6000000': str(event_type),
        '6500048': str(device_id),
        '6000012': str(raw_event_msg),
        '6000014': str(event_id),
    }
    n=n+1

Upvotes: 0

alexyichu
alexyichu

Reputation: 3622

You probably want to look into using Elasticsearch helpers, one in particular called bulk, seems you are aware of it, so instead of Elasticsearch setting the data to the index on every loop, collect the results in a list, and then once that list reaches a certain length, use the bulk function, and this dramatically increases performance.

You can see a rough idea with the following example; I had a very large text file, with 72873471 lines, efficiently calculated from the command line with wc -l big-file.txt, and then, using the same method you posted, resulted in an estimated ETA of 10 days

# Slow Method ~ 10 days
from elasticsearch import Elasticsearch
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
file = open("/path/to/big-file.txt")
with progressbar.ProgressBar(max_value=72873471) as bar:
    for idx, line in enumerate(file):
        bar.update(idx)
        clean = re.sub("\n","",line).lstrip().rstrip()
        doc = {'tag1': clean, "tag2": "some extra data"}
        es.index(index="my_index", doc_type='index_type', body=doc)

Now importing helpers from Elasticsearch, cut that time down to 3.5 hours:

# Fast Method ~ 3.5 hours
from elasticsearch import Elasticsearch, helpers
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
with progressbar.ProgressBar(max_value=72873471) as bar:
actions = []
file = open("/path/to/big-file.txt")
for idx, line in enumerate(file):
    bar.update(idx)
    if len(actions) > 10000:
        helpers.bulk(es, actions)
        actions = []
    clean = re.sub("\n","",line).lstrip().rstrip()
    actions.append({
        "_index": "my_index", # The index on Elasticsearch
        "_type": "index_type", # The document type
        "_source": {'tag1': clean, "tag2": "some extra data"}
    })

Upvotes: 1

Related Questions