Reputation: 973
I have to store some message in ElasticSearch integrate with my python program. Now what I try to store the message is:
d={"message":"this is message"}
for index_nr in range(1,5):
ElasticSearchAPI.addToIndex(index_nr, d)
print d
That means if I have 10 messages then I have to repeat my code 10 times. So what I want to do is try to make a script file or batch file. I've checked the ElasticSearch Guide, BULK API is possible to use. The format should be something like below:
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }
what I did is:
{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}
I also use curl tool to store the doc.
$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json
Now I want to use my Python code to store the file to the Elastic Search.
Upvotes: 82
Views: 113588
Reputation:
My working code
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import connections
import pandas as pd
# initialize list of lists
data = [['tom', 10, 'NY'], ['nick', 15, 'NY'], ['juli', 14, 'NY'], ['akshay', 30, 'IND'], ['Amit', 14, 'IND']]
# Create the pandas DataFrame
df = pd.DataFrame(data, columns = ['Name', 'Age', 'Country'])
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_client = connections.create_connection(hosts=['http://localhost:9200/'])
def doc_generator(df):
df_iter = df.iterrows()
for index, document in df_iter:
yield {
"_index": 'age_sample',
"_type": "_doc",
"_source": document,
}
helpers.bulk(es_client, doc_generator(df))
#get data from elastic search
from elasticsearch_dsl import Search
s = Search(index="age_sample").query("match", Name='nick')
Upvotes: 1
Reputation: 3184
There are two options which I can think of at the moment:
1. Define index name and document type with each entity:
es_client = Elasticsearch()
body = []
for entry in entries:
body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
body.append(entry)
response = es_client.bulk(body=body)
2. Provide the default index and document type with the method:
es_client = Elasticsearch()
body = []
for entry in entries:
body.append({'index': {'_id': entry['id']}})
body.append(entry)
response = es_client.bulk(index='my_index', doc_type='doc', body=body)
Works with:
ES version:6.4.0
ES python lib: 6.3.1
Upvotes: 12
Reputation: 13450
Although @justinachen 's code helped me start with py-elasticsearch, after looking in the source code let me do a simple improvement:
es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
action = {
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()
}
}
actions.append(action)
j += 1
helpers.bulk(es, actions)
helpers.bulk()
already does the segmentation for you. And by segmentation I mean the chucks sent every time to the server. If you want to reduce the chunk of sent documents do: helpers.bulk(es, actions, chunk_size=100)
Some handy info to get started:
helpers.bulk()
is just a wrapper of the helpers.streaming_bulk
but the first accepts a list which makes it handy.
helpers.streaming_bulk
has been based on Elasticsearch.bulk()
so you do not need to worry about what to choose.
So in most cases, helpers.bulk() should be all you need.
Upvotes: 51
Reputation: 4995
(the other approaches mentioned in this thread use python list for the ES update, which is not a good solution today, especially when you need to add millions of data to ES)
Better approach is using python generators -- process gigs of data without going out of memory or compromising much on speed.
Below is an example snippet from a practical use case - adding data from nginx log file to ES for analysis.
def decode_nginx_log(_nginx_fd):
for each_line in _nginx_fd:
# Filter out the below from each log line
remote_addr = ...
timestamp = ...
...
# Index for elasticsearch. Typically timestamp.
idx = ...
es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
es_fields_vals = (remote_addr, timestamp, url, status)
# We return a dict holding values from each line
es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))
# Return the row on each iteration
yield idx, es_nginx_d # <- Note the usage of 'yield'
def es_add_bulk(nginx_file):
# The nginx file can be gzip or just text. Open it appropriately.
...
es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])
# NOTE the (...) round brackets. This is for a generator.
k = ({
"_index": "nginx",
"_type" : "logs",
"_id" : idx,
"_source": es_nginx_d,
} for idx, es_nginx_d in decode_nginx_log(_nginx_fd))
helpers.bulk(es, k)
# Now, just run it.
es_add_bulk('./nginx.1.log.gz')
This skeleton demonstrates the usage of generators. You can use this even on a bare machine if you need to. And you can go on expanding on this to tailor to your needs quickly.
Python Elasticsearch reference here.
Upvotes: 43
Reputation: 1551
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
actions = [
{
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()}
}
for j in range(0, 10)
]
helpers.bulk(es, actions)
Upvotes: 152