Reputation: 6204
I am trying to bulk insert a lot of documents into elastic search using the Python API.
import elasticsearch
from pymongo import MongoClient
es = elasticsearch.Elasticsearch()
def index_collection(db, collection, fields, host='localhost', port=27017):
conn = MongoClient(host, port)
coll = conn[db][collection]
cursor = coll.find({}, fields=fields, timeout=False)
print "Starting Bulk index of {} documents".format(cursor.count())
def action_gen():
"""
Generator to use for bulk inserts
"""
for n, doc in enumerate(cursor):
op_dict = {
'_index': db.lower(),
'_type': collection,
'_id': int('0x' + str(doc['_id']), 16),
}
doc.pop('_id')
op_dict['_source'] = doc
yield op_dict
res = bulk(es, action_gen(), stats_only=True)
print res
The documents come from a Mongodb collection and I amusing the function above to do the bulk indexing according to the way explained in the docs.
the bulk indexing goes on filling elastic search with thousands of empty documents. Can anyone tell me what am I doing wrong?
Upvotes: 1
Views: 6898
Reputation: 8718
I've never seen the bulk data put together that way, especially what you're doing with "_source"
. There may be a way to get that to work, I don't know off-hand, but when I tried it I got weird results.
If you look at the bulk api, ES is expecting a meta-data document, then the document to be indexed. So you need two entries in your bulk data list for each document. So maybe something like:
import elasticsearch
from pymongo import MongoClient
es = elasticsearch.Elasticsearch()
def index_collection(db, collection, fields, host='localhost', port=27017):
conn = MongoClient(host, port)
coll = conn[db][collection]
cursor = coll.find({}, fields=fields, timeout=False)
print "Starting Bulk index of {} documents".format(cursor.count())
bulk_data = []
for n, doc in enumerate(cursor):
bulk_data.append({
'_index': db.lower(),
'_type': collection,
'_id': int('0x' + str(doc['_id']), 16),
})
bulk_data.append(doc)
es.bulk(index=index_name,body=bulk_data,refresh=True)
I didn't try to run that code, though. Here is a script I know works, that you can play with, if it helps:
from elasticsearch import Elasticsearch
es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])
index_name = "test_index"
if es_client.indices.exists(index_name):
print("deleting '%s' index..." % (index_name))
print(es_client.indices.delete(index = index_name, ignore=[400, 404]))
print("creating '%s' index..." % (index_name))
print(es_client.indices.create(index = index_name))
bulk_data = []
for i in range(4):
bulk_data.append({
"index": {
"_index": index_name,
"_type": 'doc',
"_id": i
}
})
bulk_data.append({ "idx": i })
print("bulk indexing...")
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)
print(res)
print("results:")
for doc in es_client.search(index=index_name)['hits']['hits']:
print(doc)
Upvotes: 2