Nayden Van
Nayden Van

Reputation: 1569

Logstash aggregation return empty message

I have a testing environment to test some logstash plugin before to move to production.

For now, I am using kiwi syslog generator, to generate some syslog for testing.

The field I have are as follow:

@timestamp
message
+ elastic medatadata

Starting from this basic fields, I start filtering my data. The first thing is to add a new field based on the timestamp and message as follow:

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "message_count"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
}

The prune is just to don't process unwanted data.

And this works just fine as I am getting a new field with those 2 values.

The next step was to run some aggregation based on specific content of the message, such as if the message contains logged in or logged out

and to do this, I used the aggregation filter

  grok {
    match => {
        "message" => [
        "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]

 }
}
   aggregate {
        task_id => "%{message}"
        code => "
        map['message_count'] ||= 0; map['message_count'] += 1;
        "
        push_map_as_event_on_timeout => true
        timeout_timestamp_field => "@timestamp"
        timeout => 60
        inactivity_timeout => 50
        timeout_tags => ['_aggregatetimeout']
           
        
      }
    
}

This worked as expected but I am having a problem here. When the aggregation times out. the only field populated for the specific aggregation, is the message_count

enter image description here

As you can see in the above screenshot, the newfield and message(the one on the total left, sorry it didn't fit in the screenshot) are both empty.

For the demostration and testing purpose that's is absolutely fine, but it will because unmanageable if I get hundreds of syslog per second not knowing to with message that message_count refers to.

Please, I am struggling here and I don't know how to solve this issue, can please somebody help me to understand how I can fill the newfield with the content of the message that it refers to?

This is my whole logstash configuration to make it easier.

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "message_count"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
  grok {
    match => {
        "message" => [
        "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]

 }
}
   aggregate {
        task_id => "%{message}"
        code => "
        map['message_count'] ||= 0; map['message_count'] += 1;
        "
        push_map_as_event_on_timeout => true
        timeout_timestamp_field => "@timestamp"
        timeout => 60
        inactivity_timeout => 50
        timeout_tags => ['_aggregatetimeout']
           
        
      }
    
}
output {
  elasticsearch {
     hosts => ["localhost:9200"]
         index => "logstash_index"
 }
  stdout {
    codec => rubydebug
 }
  csv {
    path => "C:\Users\adminuser\Desktop\syslog\syslogs-%{+yyyy.MM.dd}.csv"
    fields => ["timestamp", "message", "message_count", "newfield"]
 }
}

Upvotes: 0

Views: 163

Answers (1)

Badger
Badger

Reputation: 4072

push_map_as_event_on_timeout => true

When you use this, and a timeout occurs, it creates a new event using the contents of the map. If you want fields from the original messages to be in the new event then you have to add them to the map. For the task_id there is a shorthand notation to do this using the timeout_task_id_field option on the filter, otherwise you have explicitly add them

map['newfield'] ||= event.get('newfield');

Upvotes: 2

Related Questions