gbs
gbs

Reputation: 77

How to create index with new fields, parsing values from field in existing index?

I have an index that contains 1 document. This document has a field1 with value "B" and a field2 with value "C, D, E" (that is, the value in field2 is comma separated and can have variable lenght). I want to create a new index, that contains the following 3 documents: field1: "B" and field2:"C" field1:"B" and field2:"D" field1:"B" and fielde:"E" I was thinking about using a watcher to reindex the already existing documents and creating the new field at the same time. But I'm not sure how to do this nor if this is the correct approach.

Upvotes: 0

Views: 130

Answers (2)

gbs
gbs

Reputation: 77

I managed to do this with a version of the following python script:

import logging
from elasticsearch import Elasticsearch, helpers
import schedule
import time
import sys

# Configure logging to save logs in parse.log file
logging.basicConfig(level=logging.INFO, filename='parse.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configure Elasticsearch connection
es = Elasticsearch(hosts=["https://elasticsearch.com:111"], basic_auth=("elasticsearch_user", "elasticsearch_password"))

# Define source and destination index names
source_index = "source_index"
dest_index = "dest_index"

# Optional query to filter documents from source index
query = {
    "size": 5000,  # Adjust the size based on your requirements
    "query": {
        "range": {
            "@timestamp": {
                "gte": "now-1d/d",
                "lte": "now/d"
            }
        }
    }
}

# Use the helpers.bulk API for efficient indexing
def job():
    actions = []
    for hit in es.search(index=source_index, body=query)["hits"]["hits"]:
        # Modify document if needed before adding to actions
        source = hit["_source"]

        # Extract value of fields field1 and field2    
        field1 = source["field1"]
        field2 = source["field2"]
        
        # Extract specific terminals
        field1_string = str(field1)
        field2_string = str(field2)
        nodes = field2_string.split(",")

        # Define de id of the document, according to the number of terminals
        if len(nodes) == 0:
            source["node"] = ""
            actions.append({
                "_index": dest_index,
                "_id": hit["_id"],
                "_source": source.copy()  # Create a copy of the source dictionary
            })
        else:
            for node in nodes:
                new_id = field1_string + node
                source_copy = source.copy()  # Create a copy of the source dictionary
                source_copy['host'] = node
                actions.append({
                    "_index": dest_index,
                    "_id": new_id,
                    "_source": source_copy
                })

    # Send all the documents to the destination index using bulk API
    try:
        helpers.bulk(es, actions)
        logger.info(f"Bulk indexing successful. Processed {len(actions)} documents.")
    except Exception as e:
        logger.error(f"Error during bulk indexing: {e}")

def main():
    # Schedule the job to run every day at 7 am
    schedule.every().day.at("07:00").do(job)

    try:
        # Run the scheduler in an infinite loop
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        # Handle Keyboard Interrupt (Ctrl+C)
        logger.info("Received KeyboardInterrupt. Exiting gracefully.")
        sys.exit(0)

if __name__ == "__main__":
    main()

Upvotes: 0

Musab Dogan
Musab Dogan

Reputation: 3580

You can use ingest pipeline with script processors.

add dummy data

POST _bulk
{"index":{"_index":"source_index","_id":"1"}}
{"field1": "B", "field2": "C,D,E,F,G"}

create the ingest pipeline

PUT _ingest/pipeline/split-pipeline
{
  "description": "Split values in field2 dynamically",
  "processors": [
    {
      "split": {
        "field": "field2",
        "separator": ","
      }
    },
    {
      "script": {
        "source": """
          for (int i = 0; i < ctx.field2.size(); i++) {
            ctx["field2_" + i] = ctx.field2[i];
          }
        """
      }
    },
    {
      "remove": {
        "field": "field2"
      }
    }
  ]
}

reindex the data

POST _reindex
{
  "source": {
    "index": "source_index"
  },
  "dest": {
    "index": "destination_index",
    "pipeline": "split-pipeline"
  }
}

search the new data

GET destination_index/_search

OUTPUT:

"hits": [
  {
    "_index": "your_destination_index",
    "_id": "1",
    "_score": 1,
    "_source": {
      "field1": "B",
      "field2_4": "G",
      "field2_3": "F",
      "field2_2": "E",
      "field2_1": "D",
      "field2_0": "C"
    }
  }
]

Note: For real time data flow you can add the ingest pipeline into your source_index settings.

PUT source_index/_settings
{
  "index.default_pipeline": "split-pipeline"
}

For the existing data you can use _update_by_query API call.

POST source_index/_update_by_query?pipeline=split-pipeline

enter image description here

Upvotes: 0

Related Questions