Chibisuketyan
Chibisuketyan

Reputation: 25

Elastic Ingest Pipeline split field and create a nested field

Dear freindly helpers,

I have an index that is fed by a database via Kafka. Now this database holds a field that aggregates a couple of pieces of information like so key/value; key/value; (don't ask for the reason, I have no idea who designed it liked that and why ;-) )

93/4; 34/12;

it can be empty, or it can hold 1..n key/value pairs.

I want to use an ingest pipeline and ideally have a "nested" field which holds all values that are in tha field.

Probably like this:

{"categories": { "93": 7, "82": 4 } }

The use case is the following: we want to visualize the sum of a filtered number of these categories (they tell me how many minutes a specific process took longer) and relate them in ranges.

Example: I filter categories x, y ,z and then group how many documents for the day had no delay, which had a delay up to 5 minutes and which had a delay between 5 and 15 minutes.

I have tried to get the fields neatly separated with the kv processor and wanted to work from there on but it was a complete wrong approach I guess.

"kv": {
      "field": "IncomingField",
      "field_split": ";",
      "value_split": "/",
      "target_field": "delays",
      "ignore_missing": true,
      "trim_key": "\\s",
      "trim_value": "\\s",
      "ignore_failure": true
    }

When I test the pipeline it seems ok

"delays": {
            "62": "3",
            "86": "2"
          }

but there are two things that don't work.

  1. I can't know upfront how many of these combinations I have and thus converting the values from string t int in the same pipeline is an issue.
  2. When I want to create a kibana index pattern I end up with many fields like delay.82 and delay.82.keyword which does not make sense at all for the usecase as I can't filter (get only the sum of delays where the key is one of x,y,z) and aggregate.

I have looked into other processors (dorexpander) but can't really get my head around how to get this working.

I hope my question is clear (I lack english skills, sorry) and that someone can point me at the right direction.

Thank you very much!

Upvotes: 1

Views: 2210

Answers (1)

Joe - Check out my books
Joe - Check out my books

Reputation: 16925

You should rather structure them as an array of objects with shared accessors, for instance:

[ {key: 93, value: 7}, ...]

That way, you'll be able to aggregate on categories.key and categories.value.

So this means iterating the categories' entrySet() using a custom script processor like so:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "extracts k/v pairs",
    "processors": [
      {
        "script": {
          "source": """
            def categories = ctx.categories;
            def kv_pairs = new ArrayList();
    
            for (def pair : categories.entrySet()) {
              def k = pair.getKey();
              def v = pair.getValue();
              kv_pairs.add(["key": k, "value": v]);
            }
            
            ctx.categories = kv_pairs;
          """
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "categories": {
          "82": 4,
          "93": 7
        }
      }
    }
  ]
}

P.S.: Do make sure your categories field is mapped as nested b/c otherwise you'll lose the connections between the keys & the values (also called flattening).

Upvotes: 3

Related Questions