Reputation: 77
I have an index that contains 1 document. This document has a field1 with value "B" and a field2 with value "C, D, E" (that is, the value in field2 is comma separated and can have variable lenght). I want to create a new index, that contains the following 3 documents: field1: "B" and field2:"C" field1:"B" and field2:"D" field1:"B" and fielde:"E" I was thinking about using a watcher to reindex the already existing documents and creating the new field at the same time. But I'm not sure how to do this nor if this is the correct approach.
Upvotes: 0
Views: 130
Reputation: 77
I managed to do this with a version of the following python script:
import logging
from elasticsearch import Elasticsearch, helpers
import schedule
import time
import sys
# Configure logging to save logs in parse.log file
logging.basicConfig(level=logging.INFO, filename='parse.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configure Elasticsearch connection
es = Elasticsearch(hosts=["https://elasticsearch.com:111"], basic_auth=("elasticsearch_user", "elasticsearch_password"))
# Define source and destination index names
source_index = "source_index"
dest_index = "dest_index"
# Optional query to filter documents from source index
query = {
"size": 5000, # Adjust the size based on your requirements
"query": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
}
# Use the helpers.bulk API for efficient indexing
def job():
actions = []
for hit in es.search(index=source_index, body=query)["hits"]["hits"]:
# Modify document if needed before adding to actions
source = hit["_source"]
# Extract value of fields field1 and field2
field1 = source["field1"]
field2 = source["field2"]
# Extract specific terminals
field1_string = str(field1)
field2_string = str(field2)
nodes = field2_string.split(",")
# Define de id of the document, according to the number of terminals
if len(nodes) == 0:
source["node"] = ""
actions.append({
"_index": dest_index,
"_id": hit["_id"],
"_source": source.copy() # Create a copy of the source dictionary
})
else:
for node in nodes:
new_id = field1_string + node
source_copy = source.copy() # Create a copy of the source dictionary
source_copy['host'] = node
actions.append({
"_index": dest_index,
"_id": new_id,
"_source": source_copy
})
# Send all the documents to the destination index using bulk API
try:
helpers.bulk(es, actions)
logger.info(f"Bulk indexing successful. Processed {len(actions)} documents.")
except Exception as e:
logger.error(f"Error during bulk indexing: {e}")
def main():
# Schedule the job to run every day at 7 am
schedule.every().day.at("07:00").do(job)
try:
# Run the scheduler in an infinite loop
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
# Handle Keyboard Interrupt (Ctrl+C)
logger.info("Received KeyboardInterrupt. Exiting gracefully.")
sys.exit(0)
if __name__ == "__main__":
main()
Upvotes: 0
Reputation: 3580
You can use ingest pipeline with script processors.
POST _bulk
{"index":{"_index":"source_index","_id":"1"}}
{"field1": "B", "field2": "C,D,E,F,G"}
PUT _ingest/pipeline/split-pipeline
{
"description": "Split values in field2 dynamically",
"processors": [
{
"split": {
"field": "field2",
"separator": ","
}
},
{
"script": {
"source": """
for (int i = 0; i < ctx.field2.size(); i++) {
ctx["field2_" + i] = ctx.field2[i];
}
"""
}
},
{
"remove": {
"field": "field2"
}
}
]
}
POST _reindex
{
"source": {
"index": "source_index"
},
"dest": {
"index": "destination_index",
"pipeline": "split-pipeline"
}
}
GET destination_index/_search
"hits": [
{
"_index": "your_destination_index",
"_id": "1",
"_score": 1,
"_source": {
"field1": "B",
"field2_4": "G",
"field2_3": "F",
"field2_2": "E",
"field2_1": "D",
"field2_0": "C"
}
}
]
Note: For real time data flow you can add the ingest pipeline into your source_index settings.
PUT source_index/_settings
{
"index.default_pipeline": "split-pipeline"
}
For the existing data you can use
_update_by_query
API call.
POST source_index/_update_by_query?pipeline=split-pipeline
Upvotes: 0