rumeysa yuk
rumeysa yuk

Reputation: 43

Logstash MQTT output not reconnecting automatically after disconnect until container restart

in my Logstash installation, I am passing data from a certain API over MQTT. However, in some cases, the MQTT connection is interrupted and in this case, the pipeline does not work until you restart the Logstash container. As I see from the logs, I realize that the pipeline has terminated. When I restart the container, the pipeline starts working again. What steps can I take to control this situation? I tried adding a healthcheck mechanism to automatically restart the pipeline or ensure MQTT connection, but I was unsuccessful.

When such a connection problem occurs, the pipeline terminates. How do I trigger a restart of the pipeline?

docker-compose.yml file:

  logstash:
    build: ./logstash
    image: docker.elastic.co/logstash/logstash/wins:8.8.2
    container_name: logstash_hktm_multi
    volumes:
      - type: bind
        source: ./logstash/config/logstash.yml
        target: /usr/share/logstash/config/logstash.yml
        read_only: true
      - type: bind
        source: ./logstash/config/pipelines.yml
        target: /usr/share/logstash/config/pipelines.yml
        read_only: true
      - type: bind
        source: ./logstash/pipeline
        target: /usr/share/logstash/pipeline
        read_only: true
      - type: bind
        source: ./logstash/data
        target: /var/lib/logstash/data
    ports:
      - 5044:5044
      - 5000:5000/tcp
      - 5000:5000/udp
      - 9600:9600
    command: --config.reload.automatic 
    environment:
      LS_JAVA_OPTS: -Xmx1g -Xms1g
      LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
      TZ: Europe/Istanbul
      start_date: '1900-01-01'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9600"]
      interval: 3m
      timeout: 60s
      retries: 5
            
    networks:
      - elk
    depends_on:
      - elasticsearch
    restart: always

logstash.pipeline:

input {
  http_poller {
    urls => {
      dummy_request => {
        method => get
        url => "http://localhost:9600" 
      }
    }
    request_timeout => 60
    codec => "json"
    schedule => { "every" => "1m" }
  }
}

filter {
  ruby {
    code => 'event.set("date", Time.now.strftime("%Y-%m-%d"))'
  }

  ruby {
    code => 'logger.info("Sistem saati: #{Time.now.strftime("%Y-%m-%d %H:%M:%S")}")'
  }

  http {
    url => "https:/xxx"
    target_body => "response"
  }

  ruby {
    code => "event.set('total_energy', event.get('[response][energy][values][0][value]') || 0)"
  }

  mutate {  
    remove_field => [ "@version","event", "id", "name","ephemeral_id","status","snapshot", "response", "version","http_address","pipeline","monitoring","build_date","build_sha","build_snapshot", "host", "url", "error", "http", "tags", "dummy_request" ]
    add_field =>  { "isIncrement" => "true" }
    add_field =>  { "timeUnit" => "DAY" }
  }
}

output {
  mqtt {
    host => "192.168.2.205"
    port => "1883"
    topic => "ry_GESS"
  }
  stdout {
    codec => rubydebug
  }
}

When I can't reach mqtt I get the following errors:

 [2024-06-25T17:37:10,276][INFO ][logstash.filters.ruby    ][solaredge][f936179747d4760133b949bcf0600c72e081a7c5cb61ca44969561336c404f8b] Sistem saati: 2024-06-25 17:37:10
[2024-06-25T17:37:10,522]

[ERROR][logstash.javapipeline    ][solaredge] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"solaredge", :error=>"

(NotConnectedException) MQTT::NotConnectedException", :exception=>Java::OrgJrubyExceptions::Exception, :backtrace=>["RUBY.send_packet(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/mqtt-0.6.0/lib/mqtt/client.rb:567)", 


"RUBY.publish(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/mqtt-0.6.0/lib/mqtt/client.rb:330)", "RUBY.handle_events(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-mqtt-1.2.1/lib/logstash/outputs/mqtt.rb:176)", "RUBY.multi_receive(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-output-mqtt-1.2.1/lib/logstash/outputs/mqtt.rb:158)", 


"org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:121)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304)"], :thread=>"#<Thread:0x30c8f4a8 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}

Upvotes: 0

Views: 59

Answers (0)

Related Questions