fccoelho
fccoelho

Reputation: 6204

How to Bulk index in Elastic Search using the Python API

I am trying to bulk insert a lot of documents into elastic search using the Python API.

import elasticsearch
from pymongo import MongoClient

es = elasticsearch.Elasticsearch()

def index_collection(db, collection, fields, host='localhost', port=27017):
    conn = MongoClient(host, port)
    coll = conn[db][collection]
    cursor = coll.find({}, fields=fields, timeout=False)
    print "Starting Bulk index of {} documents".format(cursor.count())

    def action_gen():
        """
        Generator to use for bulk inserts
        """
        for n, doc in enumerate(cursor):

            op_dict = {
                '_index': db.lower(),
                '_type': collection,
                '_id': int('0x' + str(doc['_id']), 16),
            }
            doc.pop('_id')
            op_dict['_source'] = doc
            yield op_dict

    res = bulk(es, action_gen(), stats_only=True)
    print res

The documents come from a Mongodb collection and I amusing the function above to do the bulk indexing according to the way explained in the docs.

the bulk indexing goes on filling elastic search with thousands of empty documents. Can anyone tell me what am I doing wrong?

Upvotes: 1

Views: 6898

Answers (1)

Sloan Ahrens
Sloan Ahrens

Reputation: 8718

I've never seen the bulk data put together that way, especially what you're doing with "_source". There may be a way to get that to work, I don't know off-hand, but when I tried it I got weird results.

If you look at the bulk api, ES is expecting a meta-data document, then the document to be indexed. So you need two entries in your bulk data list for each document. So maybe something like:

import elasticsearch
from pymongo import MongoClient

es = elasticsearch.Elasticsearch()

def index_collection(db, collection, fields, host='localhost', port=27017):
    conn = MongoClient(host, port)
    coll = conn[db][collection]
    cursor = coll.find({}, fields=fields, timeout=False)
    print "Starting Bulk index of {} documents".format(cursor.count())

    bulk_data = []

    for n, doc in enumerate(cursor):

        bulk_data.append({
            '_index': db.lower(),
            '_type': collection,
            '_id': int('0x' + str(doc['_id']), 16),
        })
        bulk_data.append(doc)

    es.bulk(index=index_name,body=bulk_data,refresh=True)

I didn't try to run that code, though. Here is a script I know works, that you can play with, if it helps:

from elasticsearch import Elasticsearch

es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])

index_name = "test_index"

if es_client.indices.exists(index_name):
    print("deleting '%s' index..." % (index_name))
    print(es_client.indices.delete(index = index_name, ignore=[400, 404]))

print("creating '%s' index..." % (index_name))
print(es_client.indices.create(index = index_name))

bulk_data = []

for i in range(4):
    bulk_data.append({
        "index": {
            "_index": index_name, 
            "_type": 'doc', 
            "_id": i
        }
    })
    bulk_data.append({ "idx": i })

print("bulk indexing...")
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)
print(res)

print("results:")
for doc in es_client.search(index=index_name)['hits']['hits']:
    print(doc)

Upvotes: 2

Related Questions