whatapalaver
whatapalaver

Reputation: 915

How to persist a document in json format using elasticsearch-dsl

I am trying to update an existing elasticsearch data pipeline and would like to use elasticsearch-dsl more fully. In the current process we create a document as a json object and then use requests to PUT the object to the relevant elasticsearch index.

I would now like to use the elasticsearch-dsl save method but am left struggling to understand how I might do that when my object or document is constructed as json.

Current Process:

//import_script.py

index = 'objects'
doc = {"title": "A title", "Description": "Description", "uniqueID": "1234"}
doc_id = doc["uniqueID"]
elastic_url = 'http://elastic:changeme@localhost:9200/' + index + '/_doc/ + doc_id

api = ObjectsHandler()
api.put(elastic_url, doc)


//objects_handler.py

class ObjectsHandler():
     def put(self, url, object):
        result = requests.put(url, json=object)
        if result.status_code != requests.codes.ok:
            print(result.text)
            result.raise_for_status()

Rather than using this PUT method, I would like to tap into the Document.save functionality available in the DSL but I can't translate the examples in the api documentation for my use case.

I have amended my ObjectsHandler so that it can create the objects index:

//objects_handler.py

es = Elasticsearch([{'host': 'localhost', 'port': 9200}],
                   http_auth='elastic:changeme')

connections.create_connection(es)

class Object(Document):
    physicalDescription = Text()
    title = Text()
    uniqueID = Text()

    class Index:
        name = 'objects'
        using = es

class ObjectsHandler():

   def init_mapping(self, index):
        Object.init(using=es, index=index)

This successfully creates an index when I call api.init_mapping(index) from the importer script.

The documentation has this as an example for persisting the individual documents, where Article is the equivalent to my Object class:

# create and save and article
article = Article(meta={'id': 42}, title='Hello world!', tags=['test'])
article.body = ''' looong text '''
article.published_from = datetime.now()
article.save()

Is it possible for me to use this methodology but to persist my pre-constructed json object doc, rather than specifying individual attributes? I also need to be able to specify that the document id is the doc uniqueID.

I've extended my ObjectsHandler to include a save_doc method:

def save_doc(self, document, doc_id, index):
        new_obj = Object(meta={'id': doc_id}, 
                  title="hello", uniqueID=doc_id,
                  physicalDescription="blah")
        new_obj.save()

which does successfully save the object with uniqueID as id but I am unable to utilise the json object passed in to the method as document.

Upvotes: 0

Views: 2163

Answers (1)

whatapalaver
whatapalaver

Reputation: 915

I've had some success at this by using elasticsearch.py bulk helpers rather than elasticsearch-dsl. The following resources were super helpful:

In my question I was referring to a:

doc = {"title": "A title", "Description": "Description", "uniqueID": "1234"}

I actually have an array or list of 1 or more docs eg:

documents = [{"title": "A title", "Description": "Description", "uniqueID": "1234"}, {"title": "Another title", "Description": "Another description", "uniqueID": "1235"}]

I build up a body for the bulk import and append the id:

for document in documents:
   bulk_body.append({'index': {'_id': document["uniqueID"]}})
   bulk_body.append(document)

then run my new call to the helpers.bulk method:

api_handler.save_docs(bulk_body, 'objects')

with my objects_handler.py file looking like:

//objects_handler.py
from elasticsearch.helpers import bulk

es = Elasticsearch([{'host': 'localhost', 'port': 9200}],
                   http_auth='elastic:changeme')

connections.create_connection(es)

class Object(Document):
    physicalDescription = Text()
    title = Text()
    uniqueID = Text()

    class Index:
        name = 'objects'
        using = es

class ObjectsHandler():

   def init_mapping(self, index):
        Object.init(using=es, index=index)

   def save_docs(self, docs, index):
        print("Attempting to index the list of docs using helpers.bulk()")
        resp = es.bulk(index='objects', body=docs)
        print("helpers.bulk() RESPONSE:", resp)
        print("helpers.bulk() RESPONSE:", json.dumps(resp, indent=4))

This works for single docs in a json format or multiple docs.

Upvotes: 0

Related Questions