Damian Melniczuk
Damian Melniczuk

Reputation: 393

logstash calculate elapsed time not working

I have file containing series of such messages:

component+branch.job                                 2014-09-04_21:24:46   2014-09-04_21:24:49

It is string, some white spaces, first date and time, some white spaces and second date and time. Currently I'm using such filter:

  grok {
    match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ]
  }
  mutate {
    add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"}
    add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"}
  }
  date {
    match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
    add_tag => [ "jobStarted" ]
  }
  date {
    match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
    target => "stop_timestamp"
    remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"]
    add_tag => [ "jobStopped" ]
  }
  elapsed {
    start_tag => "jobStarted"
    end_tag => "jobStopped"
    unique_id_field => "message"
  }

As result I receive "@timestamp" and "stop_timestamp" fields with date time data and two tags, without elapsed time calculation. What I'm missing?

UPDATE

I tried with splitting (as @Rumbles suggested) event on two separate events, but somehow logstash creates two the same events:

input {
    stdin { type => "time" }
}
filter {
  grok {
    match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ]
  }
  mutate {
    add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"}
    add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"}
    update => [ "type", "start" ]
  }
  clone {
    clones => ["stop"]
  }
if [type] == "start" {
  date {
    match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
    target => ["start_timestamp"]
    add_tag => [ "jobStarted" ]
  }
}
if [type] == "stop" {
  date {
    match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
    target => "stop_timestamp"
    remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"]
    add_tag => [ "jobStopped" ]
  }
}
  elapsed {
    start_tag => "jobStarted"
    end_tag => "jobStopped"
    unique_id_field => "message"
    timeout => 15
  }
}

output {
    stdout { codec => rubydebug }
}

Upvotes: 1

Views: 1570

Answers (1)

Rumbles
Rumbles

Reputation: 1393

I've never used this filter, however I have just had a quick read of the documentation, and I think I understand the issue you are having.

From your description I believe you are trying to run the elapsed filter on one event, from the documentation it would appear that the filter is expecting 2 events, one with the starting time the second with the ending time, with a common id helping the filter to identify when the 2 events match up:

The events managed by this filter must have some particular properties. The event describing the start of the task (the “start event”) must contain a tag equal to ‘start_tag’. On the other side, the event describing the end of the task (the “end event”) must contain a tag equal to ‘end_tag’. Both these two kinds of event need to own an ID field which identify uniquely that particular task. The name of this field is stored in ‘unique_id_field’.

Each message is considered an event, so you would need to split your messages in to two events and have each pair of events have a unique identifier to help the filter to link them back together. It's not exactly a tidy solution (split your event in to two events, and then reconnect them again later) there may be a better solution to this that I am not aware of.

Upvotes: 1

Related Questions