zar3bski
zar3bski

Reputation: 3171

elasticsearch-py: error using async_bulk, index missing from actions

In its docstring, elasticsearch.helpers.async_bulk describes itself as a

Helper for the :meth:~elasticsearch.AsyncElasticsearch.bulk api that provides a more human friendly interface - it consumes an iterator of actions and sends them to elasticsearch in chunks. source

Context

I have been using AsyncElasticsearch.bulk() successfully to send pandas dataframes to some ES instance

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
        yield (json.dumps(record, default=int))

async def send_to_elasticsearch(self, df: DataFrame):
    logger.info(f"{self.stage_name} sending batch to elastic")
    await self.elastic_client.bulk(self._rec_to_actions(df))

Issue

However, when it comes to async_bulk, I am getting index is missing errors.

async def send_to_elasticsearch(self, df: DataFrame):
    await async_bulk(self.elastic_client, self._rec_to_actions(df))

Tried to tune _rec_to_actions() in several ways without much effect.

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        record["index"] = self.index
        yield (json.dumps(record, default=int))

I guess the main problem is that I am not quite sure to know what is an action, in the context of elasticsearch. This notion is everywhere in the documentation but does not have a clear data structure counterpart in this library source code (none that I could find, anyway)

What is exactly an action and how should I tune my generator to send df's data to self.index?

environment

Upvotes: 1

Views: 1051

Answers (1)

zar3bski
zar3bski

Reputation: 3171

This documentation made it easier:

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
       yield {"_index": self.index, "_source": json.dumps(record, default=int)}

Upvotes: 1

Related Questions