Reputation: 190
i am new in python and Elasticsearch i write a python code that read data from very large json file and index some attributes in Elasricsearch.
import elasticsearch
import json
es = elasticsearch.Elasticsearch() # use default of localhost, port 9200
with open('j.json') as f:
n=0
for line in f:
try:
j_content = json.loads(line)
event_type = j_content['6000000']
device_id = j_content['6500048']
raw_event_msg= j_content['6000012']
event_id = j_content["0"]
body = {
'6000000': str(event_type),
'6500048': str(device_id),
'6000012': str(raw_event_msg),
'6000014': str(event_id),
}
n=n+1
es.index(index='coredb', doc_type='json_data', body=body)
except:
pass
but it's too slow and i have many free hardware resources. how can i improve performance of code with multi thread or bulk ?
Upvotes: 4
Views: 3377
Reputation: 14360
What you want it is called Cython ;)
You can speed up your code up to 20x for sure only enabling static typing to your variables.
The code bellow should go into cython, give it a try, you'll see:
try:
j_content = json.loads(line) # Here you might want to work with cython structs.
# I can see you have a json per line, so it should be easy
event_type = j_content['6000000']
device_id = j_content['6500048']
raw_event_msg= j_content['6000012']
event_id = j_content["0"]
body = {
'6000000': str(event_type),
'6500048': str(device_id),
'6000012': str(raw_event_msg),
'6000014': str(event_id),
}
n=n+1
Upvotes: 0
Reputation: 3622
You probably want to look into using Elasticsearch helpers
, one in particular called bulk
, seems you are aware of it, so instead of Elasticsearch setting the data to the index on every loop, collect the results in a list, and then once that list reaches a certain length, use the bulk
function, and this dramatically increases performance.
You can see a rough idea with the following example; I had a very large text file, with 72873471 lines, efficiently calculated from the command line with wc -l big-file.txt
, and then, using the same method you posted, resulted in an estimated ETA of 10 days
# Slow Method ~ 10 days
from elasticsearch import Elasticsearch
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
file = open("/path/to/big-file.txt")
with progressbar.ProgressBar(max_value=72873471) as bar:
for idx, line in enumerate(file):
bar.update(idx)
clean = re.sub("\n","",line).lstrip().rstrip()
doc = {'tag1': clean, "tag2": "some extra data"}
es.index(index="my_index", doc_type='index_type', body=doc)
Now importing helpers from Elasticsearch, cut that time down to 3.5 hours:
# Fast Method ~ 3.5 hours
from elasticsearch import Elasticsearch, helpers
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
with progressbar.ProgressBar(max_value=72873471) as bar:
actions = []
file = open("/path/to/big-file.txt")
for idx, line in enumerate(file):
bar.update(idx)
if len(actions) > 10000:
helpers.bulk(es, actions)
actions = []
clean = re.sub("\n","",line).lstrip().rstrip()
actions.append({
"_index": "my_index", # The index on Elasticsearch
"_type": "index_type", # The document type
"_source": {'tag1': clean, "tag2": "some extra data"}
})
Upvotes: 1