Danish Bin Sofwan
Danish Bin Sofwan

Reputation: 476

Logstash output different fields to different elastic search indices

I have a Filebeat instance that sends Apache access logs to Logstash. Logstash pipeline transforms the file and loads the processed fields say (field1, field2 & field3) to elastic searchto an index indexA. The flow is simple & working. Here's my pipeline.conf

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "source"
        remove_field => "type"
        remove_field => "tags"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}

Now what I want to do is add three other fields field4 and field5 and add them to a seperate index named indexB. So at the end indexA holds field1 field2 and field3 while IndexB holds field4 and field5

So far this is the modified pipeline.conf which doesn't seem to work.

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "type"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }   
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}
filter
{
    mutate
    {
        add_field => { "field4" => "%{source}" }
        add_field => { "field5" => "%{tags}" }
        remove_field => "field1"
        remove_field => "field2"
        remove_field => "field3"
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexB"
    }
}   

Can someone please point out where I am going wrong or any alternative to the solution.

Upvotes: 0

Views: 1637

Answers (1)

Val
Val

Reputation: 217254

You need to duplicate your events using the clone filter. Then you can add the desired fields to each respective event and sink them in two different ES indices:

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "type"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    clone {
        clones => ["log1", "log2"]
    }
    if [type] == "log1" {
        mutate
        {
            add_field => { "field1" => "%{access_time}" }
            add_field => { "field2" => "%{host}" }
            add_field => { "field3" => "%{read_timestamp}" }
        }
    } else {   
        mutate
        {
            add_field => { "field4" => "%{source}" }
            add_field => { "field5" => "%{tags}" }
        }
    }
}
output {
    if [type] == "log1" {
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexA"
        }
    } else {   
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexB"
        }
    }
}   

Upvotes: 2

Related Questions