Reputation: 476
I have a Filebeat
instance that sends Apache
access logs to Logstash
.
Logstash
pipeline transforms the file and loads the processed fields say (field1, field2 & field3) to elastic search
to an index indexA. The flow is simple & working. Here's my pipeline.conf
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "source"
remove_field => "type"
remove_field => "tags"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
Now what I want to do is add three other fields field4 and field5 and add them to a seperate index named indexB. So at the end indexA holds field1 field2 and field3 while IndexB holds field4 and field5
So far this is the modified pipeline.conf which doesn't seem to work.
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
filter
{
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
remove_field => "field1"
remove_field => "field2"
remove_field => "field3"
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
Can someone please point out where I am going wrong or any alternative to the solution.
Upvotes: 0
Views: 1637
Reputation: 217254
You need to duplicate your events using the clone
filter. Then you can add the desired fields to each respective event and sink them in two different ES indices:
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
clone {
clones => ["log1", "log2"]
}
if [type] == "log1" {
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
} else {
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
}
}
}
output {
if [type] == "log1" {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
} else {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
}
Upvotes: 2