sb9
sb9

Reputation: 438

ingest pipeline not preserving the date type field

Here is my JSON data that i am trying to send from filebeat to ingest pipeline "logpipeline.json" in opensearch.

json data

{
   "@timestamp":"2022-11-08T10:07:05+00:00",
   "client":"10.x.x.x",
   "server_name":"example.stack.com",
   "server_port":"80",
   "server_protocol":"HTTP/1.1",
   "method":"POST",
   "request":"/example/api/v1/",
   "request_length":"200",
   "status":"500",
   "bytes_sent":"598",
   "body_bytes_sent":"138",
   "referer":"",
   "user_agent":"Java/1.8.0_191",
   "upstream_addr":"10.x.x.x:10376",
   "upstream_status":"500",
   "gzip_ratio":"",
   "content_type":"application/json",
   "request_time":"6.826",
   "upstream_response_time":"6.826",
   "upstream_connect_time":"0.000",
   "upstream_header_time":"6.826",
   "remote_addr":"10.x.x.x",
   "x_forwarded_for":"10.x.x.x",
   "upstream_cache_status":"",
   "ssl_protocol":"TLSv",
   "ssl_cipher":"xxxx",
   "ssl_session_reused":"r",
   "request_body":"{\"date\":null,\"sourceType\":\"BPM\",\"processId\":\"xxxxx\",\"comment\":\"Process status: xxxxx: \",\"user\":\"xxxx\"}",
   "response_body":"{\"statusCode\":500,\"reasonPhrase\":\"Internal Server Error\",\"errorMessage\":\"xxxx\"}",
   "limit_req_status":"",
   "log_body":"1",
   "connection_upgrade":"close",
   "http_upgrade":"",
   "request_uri":"/example/api/v1/",
   "args":""
}

Filebeat to Opensearch log shipping

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["192.168.29.117:9200"]
  pipeline: logpipeline
  #index: "filebeatelastic-%{[agent.version]}-%{+yyyy.MM.dd}"
  index: "nginx_dev-%{+yyyy.MM.dd}"
  # Protocol - either `http` (default) or `https`.
  protocol: "https"
  ssl.enabled: true
  ssl.verification_mode: none

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: "filebeat"
  password: "filebeat"

I am carrying out the "data" fields transformation in the ingest pipeline for some of the fields by doing type conversion which works perfectly. But the only problem i am facing is with the "@timestamp".

The "@timestamp" is of "date" type and once the json data goes through the pipeline i am mapping the json data message to root level json object called "data". In that transformed data the "data.@timestamp" is showing as type "string" even though i haven't done any transformation for it.

Opensearch ingestpipeline - logpipeline.json

{
  "description" : "Logging Pipeline",
  "processors" : [
    {
      "json" : {
        "field" : "message",
        "target_field" : "data"
      }
    },
    {
      "date" : {
        "field" : "data.@timestamp",
    "formats" : ["ISO8601"]
      }
    },
    {
      "convert" : {
        "field" : "data.body_bytes_sent",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.bytes_sent",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.request_length",
        "type": "integer",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.request_time",
        "type": "float",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.upstream_connect_time",
        "type": "float",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.upstream_header_time",
        "type": "float",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "convert" : {
        "field" : "data.upstream_response_time",
        "type": "float",
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ]
}

Is there any way i can preserve the "@timestamp" "date" type field even after the transformation carried out in ingest pipeline?

indexed document image:

enter image description here

Edit1: Update ingest pipeline simulate result

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_id" : "_id",
        "_source" : {
          "index_date" : "2022.11.08",
          "@timestamp" : "2022-11-08T12:07:05.000+02:00",
          "message" : """
        { "@timestamp": "2022-11-08T10:07:05+00:00", "client": "10.x.x.x", "server_name": "example.stack.com", "server_port": "80", "server_protocol": "HTTP/1.1", "method": "POST", "request": "/example/api/v1/", "request_length": "200", "status": "500", "bytes_sent": "598", "body_bytes_sent": "138", "referer": "", "user_agent": "Java/1.8.0_191", "upstream_addr": "10.x.x.x:10376", "upstream_status": "500", "gzip_ratio": "", "content_type": "application/json", "request_time": "6.826", "upstream_response_time": "6.826", "upstream_connect_time": "0.000", "upstream_header_time": "6.826", "remote_addr": "10.x.x.x", "x_forwarded_for": "10.x.x.x", "upstream_cache_status": "", "ssl_protocol": "TLSv", "ssl_cipher": "xxxx", "ssl_session_reused": "r", "request_body": "{\"date\":null,\"sourceType\":\"BPM\",\"processId\":\"xxxxx\",\"comment\":\"Process status: xxxxx: \",\"user\":\"xxxx\"}", "response_body": "{\"statusCode\":500,\"reasonPhrase\":\"Internal Server Error\",\"errorMessage\":\"xxxx\"}", "limit_req_status": "", "log_body": "1", "connection_upgrade": "close", "http_upgrade": "", "request_uri": "/example/api/v1/", "args": ""}
        """,
          "data" : {
            "server_name" : "example.stack.com",
            "request" : "/example/api/v1/",
            "referer" : "",
            "log_body" : "1",
            "upstream_addr" : "10.x.x.x:10376",
            "body_bytes_sent" : 138,
            "upstream_header_time" : 6.826,
            "ssl_cipher" : "xxxx",
            "response_body" : """{"statusCode":500,"reasonPhrase":"Internal Server Error","errorMessage":"xxxx"}""",
            "upstream_status" : "500",
            "request_time" : 6.826,
            "upstream_cache_status" : "",
            "content_type" : "application/json",
            "client" : "10.x.x.x",
            "user_agent" : "Java/1.8.0_191",
            "ssl_protocol" : "TLSv",
            "limit_req_status" : "",
            "remote_addr" : "10.x.x.x",
            "method" : "POST",
            "gzip_ratio" : "",
            "http_upgrade" : "",
            "bytes_sent" : 598,
            "request_uri" : "/example/api/v1/",
            "x_forwarded_for" : "10.x.x.x",
            "args" : "",
            "@timestamp" : "2022-11-08T10:07:05+00:00",
            "upstream_connect_time" : 0.0,
            "request_body" : """{"date":null,"sourceType":"BPM","processId":"xxxxx","comment":"Process status: xxxxx: ","user":"xxxx"}""",
            "request_length" : 200,
            "ssl_session_reused" : "r",
            "server_port" : "80",
            "upstream_response_time" : 6.826,
            "connection_upgrade" : "close",
            "server_protocol" : "HTTP/1.1",
            "status" : "500"
          }
        },
        "_ingest" : {
          "timestamp" : "2023-01-18T08:06:35.335066236Z"
        }
      }
    }
  ]
}

Upvotes: 0

Views: 643

Answers (1)

sb9
sb9

Reputation: 438

Finally able to resolve my issue. I updated the filebeat.yml with the following. Previously template name and pattern was different. But this default template name "filebeat" and pattern "filebeat" seems to be doing the job for me.

To

setup.template.name: "filebeat"
setup.template.pattern: "filebeat"

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false

But still need to figure our how templates work though

Upvotes: 0

Related Questions