Reputation: 284
I'm having trouble configuring Logstash properly. There are two lines in the postfix logs that I care about:
Jun 14 09:06:22 devmailforwarder postfix/smtp[1994]: A03CA9F532: to=<[email protected]>, relay=server[0.0.0.0]:25, delay=0.02, delays=0.01/0.01/0/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as A0B4D5C49)
Jun 14 09:15:04 devmailforwarder postfix/cleanup[2023]: 0E1969F533: warning: header Subject: subjectline from server[0.0.0.0]; from=<[email protected]> to=<[email protected]> proto=SMTP helo=<server>
My grok filter patterns are:
POSTFIX_QUEUEID ([0-9A-F]{6,}|[0-9a-zA-Z]{15,})
POSTFIX_STATUS (?<=status=)(.*)(?= \()
POSTFIX_PROCESS (?=postfix\/)(.*?\[)(.*?)(?=: )
POSTFIX_TO (?<=to=<)(.*?)(?=>,)
POSTFIX_RELAY (?<=relay=)(.*?)(?=,)
POSTFIX_SUBJECT (?<=Subject: )(.*)(?= from )
SMTP ^%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{DATA:hostname}%{SPACE}%{POSTFIX_PROCESS:process}%{GREEDYDATA}%{POSTFIX_QUEUEID:queueid}%{GREEDYDATA}%{POSTFIX_TO:to}%{GREEDYDATA}%{POSTFIX_RELAY:relay}%{GREEDYDATA}%{POSTFIX_STATUS:status}%{SPACE}%{GREEDYDATA:response}
CLEANUP ^%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{DATA:hostname}%{SPACE}%{POSTFIX_PROCESS:process}:%{SPACE}%{POSTFIX_QUEUEID:queueid}%{GREEDYDATA}%{POSTFIX_SUBJECT:subject}%{GREEDYDATA:something2}
(non-working) Logstash Config is:
input {
file {
path => "/var/log/mail.log*"
exclude => "*.gz"
start_position => "beginning"
type => "postfix"
}
}
filter {
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => { "message" => ["%{SMTP}", "%{SUBJECT}"] }
}
if "_grokparsefailure" in [tags] {
drop {}
}
mutate {
add_field => { "logstashSource" => "source-server" }
}
aggregate {
task_id => "%{POSTFIX_QUEUEID}"
code => "
map['to'] ||= event.get('to')
map['from'] ||= event.get('from')
map['relay'] ||= event.get('relay')
map['status'] ||= event.get('status')
map['response'] ||= event.get('response')
map['from'] ||= event.get('timestamp')
map['relay'] ||= event.get('hostname')
map['status'] ||= event.get('process')
map['response'] ||= event.get('queueid')
map['subject'] ||= event.get('subject')
"
map_action => "create_or_update"
push_previous_map_as_event => true
timeout => 2
timeout_tags => ['aggregated']
}
}
output {
if [type] == "postfix" {
file {
path => "/var/log/logstash/postfix.log"
}
}
}
My goal here would be to have one elasticsearch document where each field is populated. The cleanup messages always come in the logs first. The logs are matched by a unique queue ID. I'm struggling with getting the aggregate piece working.
Upvotes: 1
Views: 2848
Reputation: 284
Solved it. Config below. Also needed to update the logstash.yml to add
pipeline.workers: 1
filter {
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => { "message" => ["%{SMTP}", "%{SUBJECT}", "%{CONNECTION}"] }
}
if "_grokparsefailure" in [tags] {
drop {}
}
mutate {
add_field => { "logstashSource" => "logstash-server-name" }
}
if ("" in [queueid]) {
aggregate {
task_id => "%{queueid}"
code => "
map['to'] ||= event.get('to')
map['from'] ||= event.get('from')
map['relay'] ||= event.get('relay')
map['status'] ||= event.get('status')
map['response'] ||= event.get('response')
map['from'] ||= event.get('timestamp')
map['relay'] ||= event.get('hostname')
map['status'] ||= event.get('status')
map['subject'] ||= event.get('subject')
map['queueid'] ||= event.get('queueid')
"
timeout => 2
timeout_tags => ['aggregated']
map_action => 'create_or_update'
push_map_as_event_on_timeout => true
}
}
}
output {
if ("aggregated" in [tags] or "" in [connection])
{
elasticsearch {
index => "postfix-%{+YYYY.MM.dd}"
hosts => "your-es-host-here"
}
file {
path => "/var/log/logstash/postfix.log"
}
}
}
Upvotes: 1