Reputation: 3171
In its docstring, elasticsearch.helpers.async_bulk
describes itself as a
Helper for the :meth:
~elasticsearch.AsyncElasticsearch.bulk
api that provides a more human friendly interface - it consumes an iterator of actions and sends them to elasticsearch in chunks. source
I have been using AsyncElasticsearch.bulk()
successfully to send pandas dataframes to some ES instance
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
However, when it comes to async_bulk
, I am getting index is missing
errors.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Tried to tune _rec_to_actions()
in several ways without much effect.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
I guess the main problem is that I am not quite sure to know what is an action, in the context of elasticsearch. This notion is everywhere in the documentation but does not have a clear data structure counterpart in this library source code (none that I could find, anyway)
What is exactly an action and how should I tune my generator to send df's data to self.index
?
environment
Upvotes: 1
Views: 1051
Reputation: 3171
This documentation made it easier:
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield {"_index": self.index, "_source": json.dumps(record, default=int)}
Upvotes: 1