Sukanya Acharya
Sukanya Acharya

Reputation: 81

Load data from MongoDb to Elasticsearch through python

I have some json data loaded in MongoDb. e.g. doc1 = {"id": 1,"name": "x1"},doc2 = {"id": 2,"name": "x2"},doc3 = {"id": 3,"name": "x3"}. Now I want this data to import from MongoDb to Elasticsearch. I wrote this piece of code.

mgclient = MongoClient()
db = mgclient['light-test']
col = db['test']

es1 = Elasticsearch()
print ("Connected", es1.info())

es1.indices.create(index='light-test', ignore=400)

# Pull from mongo and dump into ES using bulk API
actions = []
for data in tqdm(col.find(), total=col.count()):
    data.pop('_id')
    action = {
        "_index": 'light-test',
        "_type": 'test',
        "_source": data
    }
    actions.append(action)
    print("complete")
    
# Dump x number of objects at a time   
    if len(actions) >= 100:
        deque(parallel_bulk(es1, actions), maxlen=0)
        actions = []

print("done")

a = es1.search(index='light-test', body={
  'query': {
    'match_all': {
     }
  }
})
print(a)

The problem is in the query returned. The hits shows blank whereas it should had returned the json files. results

Help me in importing the data from MongoDb to Elasticsearch.

Upvotes: 0

Views: 3572

Answers (1)

Sukanya Acharya
Sukanya Acharya

Reputation: 81

app = Flask(__name__)

MONGO_URL = '...'
mgclient = MongoClient(MONGO_URL, ssl=True, ssl_cert_reqs=ssl.CERT_NONE)
db = mgclient['light']
col = db['task']

doc1 = {...}
doc2 = {...}
doc3 = {...}
post_id = col.insert_many([doc1, doc2, doc3])

print(col.count())

es1 = Elasticsearch(...)
ESinfo=(es1.info())

# Pull from mongo and dump into ES using bulk API
actions = []
for data in tqdm(col.find(), total=col.count()):
    data.pop('_id')
    action = {
            "index": {
                    "_index": 'light',
                    "_type": 'task',
                    }
    }
    actions.append(action)
    actions.append(data)

#delete = es1.indices.delete(index = 'light')
request_body = {
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}
es1.indices.create(index='light', body = request_body, ignore=400)
res = es1.bulk(index = 'light', body = actions, refresh = True)

result = col.find()
names = []
for obj in col.find():
    name = obj['name']
    names.append(name)
    print(names)

@app.route('/query')
def Query():
    a = es1.search(index='light', body={
      'query': {
        'match': {
          'name': '...',
         }
      }
    })
    return jsonify(query=a)
    
if __name__ == "__main__":
	app.run(host='0.0.0.0', port=1024)
  

This has helped. thank you :)

Upvotes: 3

Related Questions