loretoparisi
loretoparisi

Reputation: 16301

ElasticSearch Ingest Pipeline: create and update timestamp field

To create a timestamp field on my indices, according to this answer, I have created a Ingest Pipeline to run over specific indices:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2"
              ].contains(ctx['_index'])) { return; }
          
          // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          
        """
      }
    }
  ]
}

then I apply to all indices settings as the default pipeline

PUT _all/_settings
{
  "index": {
    "default_pipeline": "auto_now_add"
  }
}

After that, I start indexing my objects into those indices. When I query an indexed item, I will get that item with the updated_at field updated at the time of the indexing like:

{
  _index: 'my_index_1',
  _type: '_doc',
  _id: 'r1285044056',
  _version: 11,
  _seq_no: 373,
  _primary_term: 2,
  found: true,
  _source: {
    updated_at: '2021-07-07 04:35:39',
    ...
  }
}

I would like now to have a created_at field, that only updates the first time, so I have tried to upsert script above in this way:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2",
                 "..."
              ].contains(ctx['_index'])) { return; }
          
           // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          // don't overwrite if present
          if (ctx != null && ctx['created_at'] != null) { return; }
          
          ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        """
      }
    }
  ]
}

but this solution does not seem to work: the condition

if (ctx != null && ctx['created_at'] != null) { return; }

will always fail, thus resulting in a update of the created_at at every object update on the index, in the same way of the updated_at field, making it useless. So, how to prevent that, and make sure that that field created_at exists after it has been created by the Ingestion Pipeline?

Upvotes: 3

Views: 5119

Answers (1)

Joe - Check out my books
Joe - Check out my books

Reputation: 16933

As described by @Val in this answer:

... the ingest pipeline processor(s) will only operate within the context of the document you're sending, not the one stored (if any).

As such, you won't have access to the underlying _source nor doc because ingest pipelines were designed for the ingest phase, not the update phase.


You can of course keep your auto_now_add pipeline to auto-add updated_at, and you can extend it with created_at (if not already present in the ingest payload) by checking ctx.containsKey — since ctx is essentially a java Map:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2",
                 "..."
              ].contains(ctx['_index'])) { return; }
          
          def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          
          // guaranteee updated_at
          ctx['updated_at'] = now;
          
          // add created_at only if nonexistent in the payload
          if (!ctx.containsKey('created_at')) {
            ctx['created_at'] = now;
          }  
        """
      }
    }
  ]
}

However, this'll work only for the first time you ingest your document!

Running:

POST my_index_1/_doc/some_id
{ 
  "some": "param"
}

will yield:

{
  "some" : "param",
  "updated_at" : "2021-07-08 10:35:13",
  "created_at" : "2021-07-08 10:35:13"
}

Now, in order to auto-increment updated_at each time you update a document, you'll need one more script — this time stored under _scripts, not _ingest/pipeline:

PUT _scripts/incement_update_at__plus_new_params
{
  "script": {
    "lang": "painless", 
    "source": """
      // add whatever is in the params
      ctx._source.putAll(params);
      
      // increment updated_at no matter what was in the params
      ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    """
  }
}

Then, as you run your _update call, do so by mentioning the above-mentioned script:

POST my_index_1/_doc/some_id/_update
{
  "script": {
    "id": "incement_update_at__plus_new_params",
    "params": {
      "your": "new params"
    }
  }
}

which'll increment updated_at without touching created_at and add any other params:

{
   "some":"param",
   "updated_at":"2021-07-08 10:49:44",    <--
   "created_at":"2021-07-08 10:39:55",
   "your":"new params"                    <--
}

Shameless plug: I discuss pipelines & scripts in great detail in my Elasticsearch Handbook.

Upvotes: 4

Related Questions