Reputation: 3
I have the following logstash kafka output plugin with a conditional in it that produces an error from logstash when starting the pipeline.
Are conditionals supported in a kafka output filter?
output {
kafka {
id => "plugin_SharedAlarmCreated"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
if "null-value" in [tags] {
message_key => "%{Source}+%{Id}+NULL"
}
else {
message_key => "%{Source}+%{Id}+%{SourceId}"
}
}
}
[2022-10-15T16:37:35,971][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "=>" at line 30, column 8 (byte 726) after output {\r\n kafka {\r\n id => "plugin_SharedAlarmCreated"\r\n bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"\r\n codec => json\r\n topic_id => "Shared.Event.AlarmCreated"\r\n if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in
block in converge_state'"]}
Upvotes: 0
Views: 276
Reputation: 4072
No, you cannot use a conditional inside a plugin definition. You can use a conditional in the output section
output {
if "null-value" in [tags] {
kafka {
id => "plugin_SharedAlarmCreated1"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{Source}+%{Id}+NULL"
}
} else {
kafka {
id => "plugin_SharedAlarmCreated2"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{Source}+%{Id}+%{SourceId}"
}
}
}
However, if you have multiple outputs you will have multiple connections to kafka, so it might be better to use a conditional in the filter section.
filter {
if "null-value" in [tags] {
mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+NULL" } }
}
else {
mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}" } }
}
}
output {
kafka {
id => "plugin_SharedAlarmCreated"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{[@metadata][message_key]}"
}
}
Upvotes: 1
Reputation: 3
I had already tried your 1st suggestion and it failed with a duplicate plugin_id error. I guess I could use two distinct plugin_id values but I like your 2nd suggestion better. I tried it and still got an error.
I have ruby filter that creates the null-value tag,. Here is my complete filter section.
filter {
ruby {
code => "if event.get('SourceId').nil?; event.set('tags','null-value');end"
}
if "null-value" in [tags] {
[@metadata][message_key] => "%{Source}+%{Id}+NULL"
}
else {
[@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}"
}
}
[2022-10-15T18:46:54,491][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "if", [A-Za-z0-9_-], '"', "'", "}" at line 24, column 5 (byte 560) after filter {\r\n ruby {\r\n code => "if event.get('SourceId').nil?; event.set('tags','null-value');end"\r\n }\r\n\r\n if "null-value" in [tags] {\r\n ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in
block in converge_state'"]}
Line 24 is the closing curly brace for the "if" confitional.
Upvotes: 0