Reputation: 1
I'm using filebeat - 6.5.1, Logstash - 6.5.1 and elasticsearch - 6.5.1
I'm using multiple GROK in the single config file and trying to send the logs into Elasticsearch
Below is my Filebeat.yml
filebeat.prospectors:
type: log
paths:
var/log/message
fields:
type: apache_access
tags: ["ApacheAccessLogs"]
type: log
paths:
var/log/indicate
fields:
type: apache_error
tags: ["ApacheErrorLogs"]
type: log
paths:
var/log/panda
fields:
type: mysql_error
tags: ["MysqlErrorLogs"]
output.logstash:
The Logstash hosts
hosts: ["logstash:5044"]
Below is my logstash config file -
input {
beats {
port => 5044
tags => [ "ApacheAccessLogs", "ApacheErrorLogs", "MysqlErrorLogs" ]
}
}
filter {
if "ApacheAccessLogs" in [tags] {
grok {
match => [
"message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}",
"message" , "%{COMMONAPACHELOG}+%{GREEDYDATA:extra_fields}"
]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
}
geoip {
source => "clientip"
target => "geoip"
add_tag => [ "apache-geoip" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
if "ApacheErrorLogs" in [tags] {
grok {
match => { "message" => ["[%{APACHE_TIME:[apache2][error][timestamp]}] [%{LOGLEVEL:[apache2][error][level]}]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message]}",
"[%{APACHE_TIME:[apache2][error][timestamp]}] [%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}] [pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?]( [client %{IPORHOST:[apache2][error][client]}])? %{GREEDYDATA:[apache2][error][message1]}" ] }
pattern_definitions => {
"APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
}
remove_field => "message"
}
mutate {
rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
}
date {
match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
remove_field => "[apache2][error][timestamp]"
}
}
if "MysqlErrorLogs" in [tags] {
grok {
match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} ([%{DATA:[mysql][error][level]}] )?%{GREEDYDATA:[mysql][error][message]}",
"%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} [%{DATA:[mysql][error][level]}] %{GREEDYDATA:[mysql][error][message1]}",
"%{GREEDYDATA:[mysql][error][message2]}"] }
pattern_definitions => {
"LOCALDATETIME" => "[0-9]+ %{TIME}"
}
remove_field => "message"
}
mutate {
rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
}
mutate {
rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
}
date {
match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
remove_field => "[apache2][access][time]"
}
}
}
output {
if "ApacheAccessLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheaccess"
}
}
if "ApacheErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "apacheerror"
}
}
if "MysqlErrorLogs" in [tags] {
elasticsearch { hosts => ["elasticsearch:9200"]
index => "apache"
document_id => "sqlerror"
}
}
stdout { codec => rubydebug }
}
The data is sent to elastic search but only 3 records are getting created for each document_id in the same index.
Only 3 records are created and every new logs incoming are overwritten onto the same document_id and the old one is lost.
Can you guys please help me out?
Upvotes: 0
Views: 216
Reputation: 61
The definition of document_id is to provide an unique document id for an event. In your case, as they are static (apacheaccess, apacheerror, sqlerror), there will be only 1 event per index ingested into elasticsearch, overide by the newest event.
As you have 3 distinct data type, what you seems to be looking for provide for each event type (ApacheAccessLogs, ApacheErrorLogs, MysqlErrorLogs) a different index, as following :
output {
if "ApacheAccessLogs" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "apache-access"
}
}
if "ApacheErrorLogs" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "apache-error"
}
}
if "MysqlErrorLogs" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "mysql-error"
}
}
stdout {
codec => rubydebug
}
}
There are not many cases where you need to set the id manually (eg. in case of reingest of data), as Logstash & Elasticsearch will manage that by themself.
But if that's the case, and you can't use a field to identify each event individually, you could use the logstash-filter-fingerprint, that is made for that.
Upvotes: 2