Reputation: 11
I'm working with logstash for the first time, and I'm trying to take JSON reports from amavisd-new in for searching and analysis. Amavisd-new is able to write the json logging to redis, and I have everything importing perfectly, and have started learning my way through all this.
But I have one issue - the format of the JSON report from amavis looks like the following - note that "recipients" has an array, with one entry for each recipient.
I'd like to split the entire event into two - one for each recipient, leaving all the other fields the same, but replacing the "action", "ccat_main", "queued_as", etc fields from each recipient array member into the main even.
The idea would be that one incoming event with two recipients would result in two separate log events in logstash - one for each person.
I've looked at split for events, but I'm not seeing how to do this - I can't seem to find any appropriate examples anywhere.
So, for real-word examples, given this:
{
"@timestamp" => "2014-05-06T09:29:47.048Z",
"time_unix" => 1399368587.048,
"time_iso_week_date" => "2014-W19-2",
"partition" => "19",
"type" => "amavis",
"host" => "mailer.example.net",
"queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
"recipients" => [
{ "action" => "PASS",
"ccat_main" => "Clean",
"queued_as" => "3gNFyR4Mfjzc3",
"rcpt_is_local" => false,
"rcpt_to" => "[email protected]",
"smtp_code" => "250",
"smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
"spam_score" => -2.0
},
{ "action" => "PASS",
"ccat_main" => "Clean",
"mail_id_related" => "men7HTERZaOF",
"penpals_age" => 1114599,
"queued_as" => "3gNFyR4n6Lzc4",
"rcpt_is_local" => true,
"rcpt_to" => "[email protected]",
"smtp_code" => "250",
"smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
"spam_score" => -5.272
}
],
"smtp_code" => ["250"],
}
I'd like to end up with two different events, like these:
{
"@timestamp" => "2014-05-06T09:29:47.048Z",
"time_unix" => 1399368587.048,
"time_iso_week_date" => "2014-W19-2",
"partition" => "19",
"type" => "amavis",
"host" => "mailer.example.net",
"queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
"action" => "PASS",
"ccat_main" => "Clean",
"queued_as" => "3gNFyR4Mfjzc3",
"rcpt_is_local" => false,
"rcpt_to" => "[email protected]",
"smtp_code" => "250",
"smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
"spam_score" => -2.0
"smtp_code" => ["250"],
}
and
{
"@timestamp" => "2014-05-06T09:29:47.048Z",
"time_unix" => 1399368587.048,
"time_iso_week_date" => "2014-W19-2",
"partition" => "19",
"type" => "amavis",
"host" => "mailer.example.net",
"queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
"recipients" => [
"action" => "PASS",
"ccat_main" => "Clean",
"mail_id_related" => "men7HTERZaOF",
"penpals_age" => 1114599,
"queued_as" => "3gNFyR4n6Lzc4",
"rcpt_is_local" => true,
"rcpt_to" => "[email protected]",
"smtp_code" => "250",
"smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
"spam_score" => -5.272
"smtp_code" => ["250"],
}
EDIT:
Okay, I simply used the split filter - I should have seen that. But there's one thing that's confusing me.
When there's a single recipient, it passes the block right through - the result in kibana looks like:
recipients {
"action": "PASS",
"bypass_banned_checks": true,
"bypass_spam_checks": true,
"ccat_main": "Clean",
"queued_as": "3qv7Km4Ybpz14Kyh",
"rcpt_is_local": true,
"rcpt_to": "[email protected]",
"rid": "552213780",
"smtp_code": "250",
"smtp_response": "250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Km4Ybpz14Kyh"
}
But when there are 2 or more recipients, the new events each look like this, with their appropriate info:
recipients.action PASS
recipients.ccat_main CleanTag
recipients.queued_as 3qv7Ly4Pqvz4wyS
recipients.rcpt_is_local true
recipients.rcpt_to [email protected]
recipients.rid 552278239
recipients.smtp_code 250
recipients.smtp_response 250 2.0.0 from MTA(smtp:[127.0.0.1]:10025): 250 2.0.0 Ok: queued as 3qv7Ly4Pqvz4wyS
recipients.whitelisted true
Why the difference between the two? I think I'd prefer to keep the recipients field as a hash of values, so what's the best way to get make the split events consistent with the single event?
Upvotes: 1
Views: 1880
Reputation: 11
Here's what I ended up doing. This makes it consistent whether there are one or more members of the array being split.
There's probably an easier way to do it, but this is covering me for now. If i come up with something else, I'll come back and revise.
filter {
split {
field => "recipients"
target => "recipcopy"
remove_field => "recipients"
}
}
filter {
if [recipients] {
ruby {
code => "event['recipcopy'] = event['recipients'][0]"
remove_field => "recipients"
}
}
}
filter {
if [recipcopy] {
mutate {
rename => { "recipcopy" => "recipients" }
}
}
}
Upvotes: 0
Reputation: 16362
That's the split filter. In each copy, you would then rename fields to be at the right level, or remove the fields you didn't want in each copy.
Upvotes: 1