Reputation: 393
I have file containing series of such messages:
component+branch.job 2014-09-04_21:24:46 2014-09-04_21:24:49
It is string, some white spaces, first date and time, some white spaces and second date and time. Currently I'm using such filter:
grok {
match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ]
}
mutate {
add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"}
add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"}
}
date {
match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
add_tag => [ "jobStarted" ]
}
date {
match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
target => "stop_timestamp"
remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"]
add_tag => [ "jobStopped" ]
}
elapsed {
start_tag => "jobStarted"
end_tag => "jobStopped"
unique_id_field => "message"
}
As result I receive "@timestamp" and "stop_timestamp" fields with date time data and two tags, without elapsed time calculation. What I'm missing?
UPDATE
I tried with splitting (as @Rumbles suggested) event on two separate events, but somehow logstash creates two the same events:
input {
stdin { type => "time" }
}
filter {
grok {
match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ]
}
mutate {
add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"}
add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"}
update => [ "type", "start" ]
}
clone {
clones => ["stop"]
}
if [type] == "start" {
date {
match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
target => ["start_timestamp"]
add_tag => [ "jobStarted" ]
}
}
if [type] == "stop" {
date {
match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ]
target => "stop_timestamp"
remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"]
add_tag => [ "jobStopped" ]
}
}
elapsed {
start_tag => "jobStarted"
end_tag => "jobStopped"
unique_id_field => "message"
timeout => 15
}
}
output {
stdout { codec => rubydebug }
}
Upvotes: 1
Views: 1570
Reputation: 1393
I've never used this filter, however I have just had a quick read of the documentation, and I think I understand the issue you are having.
From your description I believe you are trying to run the elapsed filter on one event, from the documentation it would appear that the filter is expecting 2 events, one with the starting time the second with the ending time, with a common id helping the filter to identify when the 2 events match up:
The events managed by this filter must have some particular properties. The event describing the start of the task (the “start event”) must contain a tag equal to ‘start_tag’. On the other side, the event describing the end of the task (the “end event”) must contain a tag equal to ‘end_tag’. Both these two kinds of event need to own an ID field which identify uniquely that particular task. The name of this field is stored in ‘unique_id_field’.
Each message is considered an event, so you would need to split your messages in to two events and have each pair of events have a unique identifier to help the filter to link them back together. It's not exactly a tidy solution (split your event in to two events, and then reconnect them again later) there may be a better solution to this that I am not aware of.
Upvotes: 1