isa
isa

Reputation: 111

Create Field using fluentd

I have logs that I am consuming with Fluentd and sending to Elasticsearch. I would like to create a new field if a string is found.

Sample log:

{
  "@timestamp": "2021-01-29T08:05:38.613Z",
  "@version": "1",
  "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
  "level": "INFO"
}

I would like to create a new field STARTIME and the value, in this case, would be 113.187

What I have tried is, used the record_transformer and ruby split to get the value but it seems when it matches it remove the string I want from the log file.

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running").last.split(")")}
  </record>
</filter>

How can I create this new field with the desired value?

I have now used the suggested option below:

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}
  </record>
</filter>

Which got me closer. What's happening now is the Field STARTIME is created and when the log entry matches it has the value of 113.187 which is correct however every other line that does not match this pattern just gets added to the new field.

enter image description here

Upvotes: 2

Views: 1630

Answers (2)

Azeem
Azeem

Reputation: 14637

You can try something like this:

<record>
  STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }
</record>

STARTIME will have the valid value, null otherwise.

Upvotes: 1

Evaldas Buinauskas
Evaldas Buinauskas

Reputation: 14097

Maybe this is not a direct answer to solve this using Fluentd transformations, but you could use Elasticsearch ingestion pipelines together with grok processor to extract your data. This is a simulated example:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Enrich logs",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "(JVM running for %{NUMBER:start_time})"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "@timestamp": "2021-01-29T08:05:38.613Z",
        "@version": "1",
        "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
        "level": "INFO"
      }
    }
  ]
}

_source is the document you have provided and there's a single grok processor to extract start_time from the message field. Calling this pipeline results in:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "start_time" : "113.187",
          "@timestamp" : "2021-01-29T08:05:38.613Z",
          "level" : "INFO",
          "@version" : "1",
          "message" : "Started Application in 110.374 seconds (JVM running for 113.187)"
        },
        "_ingest" : {
          "timestamp" : "2021-01-29T14:09:43.447147676Z"
        }
      }
    }
  ]
}

You can see that after transformation, your document contains "start_time" : "113.187" value.

Upvotes: 0

Related Questions