Reputation: 43
in my Logstash installation, I am passing data from a certain API over MQTT. However, in some cases, the MQTT connection is interrupted and in this case, the pipeline does not work until you restart the Logstash container. As I see from the logs, I realize that the pipeline has terminated. When I restart the container, the pipeline starts working again. What steps can I take to control this situation? I tried adding a healthcheck mechanism to automatically restart the pipeline or ensure MQTT connection, but I was unsuccessful.
When such a connection problem occurs, the pipeline terminates. How do I trigger a restart of the pipeline?
docker-compose.yml file:
logstash:
build: ./logstash
image: docker.elastic.co/logstash/logstash/wins:8.8.2
container_name: logstash_hktm_multi
volumes:
- type: bind
source: ./logstash/config/logstash.yml
target: /usr/share/logstash/config/logstash.yml
read_only: true
- type: bind
source: ./logstash/config/pipelines.yml
target: /usr/share/logstash/config/pipelines.yml
read_only: true
- type: bind
source: ./logstash/pipeline
target: /usr/share/logstash/pipeline
read_only: true
- type: bind
source: ./logstash/data
target: /var/lib/logstash/data
ports:
- 5044:5044
- 5000:5000/tcp
- 5000:5000/udp
- 9600:9600
command: --config.reload.automatic
environment:
LS_JAVA_OPTS: -Xmx1g -Xms1g
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
TZ: Europe/Istanbul
start_date: '1900-01-01'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9600"]
interval: 3m
timeout: 60s
retries: 5
networks:
- elk
depends_on:
- elasticsearch
restart: always
logstash.pipeline:
input {
http_poller {
urls => {
dummy_request => {
method => get
url => "http://localhost:9600"
}
}
request_timeout => 60
codec => "json"
schedule => { "every" => "1m" }
}
}
filter {
ruby {
code => 'event.set("date", Time.now.strftime("%Y-%m-%d"))'
}
ruby {
code => 'logger.info("Sistem saati: #{Time.now.strftime("%Y-%m-%d %H:%M:%S")}")'
}
http {
url => "https:/xxx"
target_body => "response"
}
ruby {
code => "event.set('total_energy', event.get('[response][energy][values][0][value]') || 0)"
}
mutate {
remove_field => [ "@version","event", "id", "name","ephemeral_id","status","snapshot", "response", "version","http_address","pipeline","monitoring","build_date","build_sha","build_snapshot", "host", "url", "error", "http", "tags", "dummy_request" ]
add_field => { "isIncrement" => "true" }
add_field => { "timeUnit" => "DAY" }
}
}
output {
mqtt {
host => "192.168.2.205"
port => "1883"
topic => "ry_GESS"
}
stdout {
codec => rubydebug
}
}
When I can't reach mqtt I get the following errors:
[2024-06-25T17:37:10,276][INFO ][logstash.filters.ruby ][solaredge][f936179747d4760133b949bcf0600c72e081a7c5cb61ca44969561336c404f8b] Sistem saati: 2024-06-25 17:37:10
[2024-06-25T17:37:10,522]
[ERROR][logstash.javapipeline ][solaredge] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"solaredge", :error=>"
(NotConnectedException) MQTT::NotConnectedException", :exception=>Java::OrgJrubyExceptions::Exception, :backtrace=>["RUBY.send_packet(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/mqtt-0.6.0/lib/mqtt/client.rb:567)",
"RUBY.publish(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/mqtt-0.6.0/lib/mqtt/client.rb:330)", "RUBY.handle_events(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-mqtt-1.2.1/lib/logstash/outputs/mqtt.rb:176)", "RUBY.multi_receive(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-mqtt-1.2.1/lib/logstash/outputs/mqtt.rb:158)",
"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304)"], :thread=>"#<Thread:0x30c8f4a8 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}
Upvotes: 0
Views: 59