Dario R
Dario R

Reputation: 63

Issue with conditionals in logstash with fields from Kafka ----> FileBeat prospectors

I have the following scenario:

FileBeat ----> Kafka -----> Logstash -----> Elastic ----> Kibana

In Filebeat I have 2 prospectors the in YML file,,. and I add some fields to identify the log data. But, the issue is: in Logstash I haven't be able to validate this fields.

The configuration files are:

1. filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /opt/jboss/server.log*
  tags: ["log_server"]
  fields:
    environment: integracion
    log_type: log_server

  document_type: log_server
  fields_under_root: true


- input_type: log
  paths:
    - /var/todo1_apps/ebanTX.log*
  tags: ["log_eban"]
  fields:
    environment: integracion
    log_type: log_ebanking

  document_type: log_ebanking
  fields_under_root: true

output.kafka:
    enabled: true
    hosts: ["192.168.105.68:9092"]
    topic: "sve_logs"
    timeout: 30s

2. logstash.conf

input {
  kafka {
    bootstrap_servers => "192.xxx.xxx.xxx:9092"
    group_id => "sve_banistmo"
    topics => ["sve_logs"]
    decorate_events => true
    codec => "plain"
    }
  }

filter {
if [type] == "log_ebanking" {
    grok {
       patterns_dir => ["patterns/patterns"]
        match => { "message" => "%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{LOGLEVEL:level}%{SPACE}\[%{DATA:thread}]%{SPACE}-%{SPACE}%{GREEDYDATA:message_log}" }
        }
  }
}

output {
if [type] == "log_ebanking" {
      elasticsearch {
        hosts => ["192.168.105.67:9200"]
        index => "sve-banistmo-ebanking-%{+YYYY.MM.dd}"
      }
        stdout { codec => json}
  }
}

The problem is in the conditional filter and output section. I've tried with

@[metadata][type]
@metadata][type]
@metadata.type
metadata.type
[type]

With both the type and log_type variable. Nothing works !! :S If I don't put conditionals, the data flow without problem. I mean, is not a conection issue.

Please help me. I've reviewed all the information related, but in my case the conditional doesn't work.

Thanks in advance

Dario R

Upvotes: 1

Views: 2049

Answers (4)

bokzor
bokzor

Reputation: 445

You can access to a custom field this way :

  if [kafka][fields][log_type] == "nginx_access" {
    grok {
        patterns_dir => "/etc/logstash/patterns"
        match => { "message" => "%{nginx_access}" }
    }
  }

Upvotes: 0

Zee
Zee

Reputation: 172

Been through this before with Kafka! Here are the steps I have done to make it work:

  1. Update your Kafka input plugin cd /usr/share/logstash/bin then ./logstash-plugin update logstash-input-kafka
  2. In yout LS config file add this to your Kakfa input plugin codec => "json"
  3. keep you filter empty for now, as first you need to make sure you are receving the data in JSON on Elasticsearch. If you have Kibana check there or run ES queries if you dont.
  4. then you should be able to access any fields anywhere in your LS config files.

Now for your output I see you are adding "log_type" to your events in filebeat, then I would suggest in your LS output plugin you do if "log_ebanking" == [log_type].

The field [type] is by default "logs" for filebeat and "metricsets" for metricbeat for all events.

Not sure what is your Filebeat version but look at this as document_type is deprecated in 5.5 https://www.elastic.co/guide/en/beats/filebeat/current/configuration-filebeat-options.html#filebeat-document-type

Upvotes: 0

sharonpearl
sharonpearl

Reputation: 21

Use codec => "json" to extract all fields from the message in logstash.conf kafka input conf.

Upvotes: 1

Shengyue
Shengyue

Reputation: 31

The problem is the message from kafka is not decoded. Logstash will take the whole json message reported by filebeat as the message. You can add json filter to decode the json format message.

filter {
  json {
    source => "message"
  }
}

The fields will be decoded. The message field will be replaced with the real message, rather than the whole json string.

Then you can use [type] in your conditional blocks. And the @metadata is not reported by filebeat when using kafka as output. So you cannot see @metadata.

Upvotes: 1

Related Questions