Reputation: 688
I'm working on a filter for "389 Directory Server" logs, the logs show a history of a users operation on the server, connection, search, add, modify, etc... I'm using the aggregate filter to combine all this log lines into one single event.
However, I want the final @timestamp of the event (after the user has disconnected) to be the @timestamp of the first event (when the connection was first made) I've tried using the date filter, and although it does change the @timestamp of each event (each log line), the final map that the aggregate filter produces still uses the time when the log was processed.
I can save the first @timestamp to another filed in the map, but how can I replace the @timestamp with that field?
Since the filter is long, I'll include the start and end only:
filter {
grok {
match => { "message" => [
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} fd=%{NUMBER:file_descriptor} slot=%{NUMBER} %{WORD:connection_method} connection from %{IP:source} to %{IP:destination}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} %{NOTSPACE:ssl_version} (?<encryption_method>%{NOTSPACE} %{NOTSPACE})$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} dn=%{QUOTEDSTRING:user_dn} method=%{NOTSPACE:bind_method} version=%{NUMBER:ldap_version}($)?(mech=%{NOTSPACE:auth_mechanism}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} err=%{NUMBER:error_code} tag=%{NUMBER:tag_number} nentries=%{NUMBER:number_of_entries} etime=%{NUMBER:operation_time}($)?(dn=%{QUOTEDSTRING}$)?",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} base=%{QUOTEDSTRING:search_base} scope=%{NUMBER:search_scope} filter=%{QUOTEDSTRING:search_filter} attrs=%{QUOTEDSTRING:search_attributes}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation}$",
"^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} fd=%{NUMBER:file_descriptor} %{WORD:connection_result} - %{WORD:connection_code}$"
]
}
}
if "" in [connection_method] {
aggregate {
task_id => "%{connection_id}"
code => "
map['timestamp'] = event['@timestamp']
map['tags'] ||= ['aggregated']
map['source'] = event['source']
map['destination'] = event['destination']
map['file_descriptor'] = event['file_descriptor']
map['connection_method'] = event['connection_method']
"
map_action => "create"
}
}
else if "" in [connection_code] {
mutate {
add_tag => [ "map_finished" ]
}
aggregate {
task_id => "%{connection_id}"
code => "
map['operations'][event['op_number']]['connection_code'] = event['connection_code']
map['operations'][event['op_number']]['connection_result'] = event['connection_result']
"
map_action => "update"
}
}
else {
aggregate {
task_id => "%{connection_id}"
code => "
map['@timestamp'] = map['timestamp']
"
timeout => 0
push_map_as_event_on_timeout => true
}
}
}
Upvotes: 0
Views: 1003
Reputation: 21
You can also update your aggregated event timestamp according to a particular event timestamp with below code line. But if you are using log file timestamp as your event timestamp then make sure that you apply the date filter before the aggregate operation.
map['@timestamp'] ||= event.get('@timestamp');
Upvotes: 2
Reputation: 688
Figured it out, what I didn't realize/understand was that when I reach the final event and push out the map, it (the map) will be processed by logstash as a new event (like a new log line from a file) and logstash will try to match that event to one of the filters, and fail since the final map doesn't hold massage field that will match to any filter.
Creating a new filter fixed the problem.
filter {
if "aggregated" in [tags] {
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
target => "@timestamp"
add_tag => [ "tmatch" ]
}
}
}
Upvotes: 0
Reputation: 568
using date in filter ?
ISO8601 means your date type
date {
# timezone => "America/Asuncion"
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
Upvotes: 0