Keith Jensen
Keith Jensen

Reputation: 3

Are conditionals supported in a kafka output plugin?

I have the following logstash kafka output plugin with a conditional in it that produces an error from logstash when starting the pipeline.

Are conditionals supported in a kafka output filter?

output {
  kafka {
    id => "plugin_SharedAlarmCreated"
    bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
    codec => json
    topic_id => "Shared.Event.AlarmCreated"
    if "null-value" in [tags] {
      message_key => "%{Source}+%{Id}+NULL"
    }
    else {
      message_key => "%{Source}+%{Id}+%{SourceId}"
    }
  }
}

[2022-10-15T16:37:35,971][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "=>" at line 30, column 8 (byte 726) after output {\r\n kafka {\r\n id => "plugin_SharedAlarmCreated"\r\n bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"\r\n codec => json\r\n topic_id => "Shared.Event.AlarmCreated"\r\n if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in block in converge_state'"]}

Upvotes: 0

Views: 276

Answers (2)

Badger
Badger

Reputation: 4072

No, you cannot use a conditional inside a plugin definition. You can use a conditional in the output section

output {
    if "null-value" in [tags] {
        kafka {
            id => "plugin_SharedAlarmCreated1"
            bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
            codec => json
            topic_id => "Shared.Event.AlarmCreated"
            message_key => "%{Source}+%{Id}+NULL"
        }
    } else {
        kafka {
            id => "plugin_SharedAlarmCreated2"
            bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
            codec => json
            topic_id => "Shared.Event.AlarmCreated"
            message_key => "%{Source}+%{Id}+%{SourceId}"
        }
    }
}

However, if you have multiple outputs you will have multiple connections to kafka, so it might be better to use a conditional in the filter section.

filter {
    if "null-value" in [tags] {
      mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+NULL" } }
    }
    else {
      mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}" } }
    }
}

output {
  kafka {
    id => "plugin_SharedAlarmCreated"
    bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
    codec => json
    topic_id => "Shared.Event.AlarmCreated"
    message_key => "%{[@metadata][message_key]}"
  }
}

Upvotes: 1

Keith Jensen
Keith Jensen

Reputation: 3

I had already tried your 1st suggestion and it failed with a duplicate plugin_id error. I guess I could use two distinct plugin_id values but I like your 2nd suggestion better. I tried it and still got an error.

I have ruby filter that creates the null-value tag,. Here is my complete filter section.

filter {
  ruby {
    code => "if event.get('SourceId').nil?; event.set('tags','null-value');end"
  }

  if "null-value" in [tags] {
    [@metadata][message_key] => "%{Source}+%{Id}+NULL"
  }
  else {
    [@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}"
  }
}

[2022-10-15T18:46:54,491][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "if", [A-Za-z0-9_-], '"', "'", "}" at line 24, column 5 (byte 560) after filter {\r\n ruby {\r\n code => "if event.get('SourceId').nil?; event.set('tags','null-value');end"\r\n }\r\n\r\n if "null-value" in [tags] {\r\n ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in block in converge_state'"]}

Line 24 is the closing curly brace for the "if" confitional.

Upvotes: 0

Related Questions