Jose Antonio
Jose Antonio

Reputation: 461

Logstash loses input connection to Kafka when Kafka restart

I have set up a ELK (elasticsearch + logstash + kibana) stack where data comes from a kafka installation. Topics written to kafka are consumed by logstash who eventually stores them to elasticsearch. Each different product runs in a separate container inside a Docker Swarm cluster, and everything works fine as long as kafka does not fail. If I restart the kafka container, logstash loses connection to kafka forever. The only way to recover connection between logstash and kafka in this situation is by restarting also logstash.

Here are the logs I get from logstash when kafka restarts:

[2019-11-25T11:42:12,420][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-5, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,420][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-8, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,422][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,425][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-0, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,426][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,427][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-12, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,428][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-9, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,429][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-1, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,431][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,431][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Discovered group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null)
[2019-11-25T11:42:12,432][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-14, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,432][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-10, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-13, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-7, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,434][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-2, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,438][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-3, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,438][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-docker-desktop-6, groupId=logstash] Group coordinator b0679a32c0ac:9092 (id: 2147482646 rack: null) is unavailable or invalid, will attempt rediscovery
[2019-11-25T11:42:12,481][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-12, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,481][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-1, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-7, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-2, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,484][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-13, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,489][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-6, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,526][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-5, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,526][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-8, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,529][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-9, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,540][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-3, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,577][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-4, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.
[2019-11-25T11:42:12,578][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-docker-desktop-11, groupId=logstash] Connection to node 1001 (b0679a32c0ac/10.0.2.43:9092) could not be established. Broker may not be available.

And logs from kafka regarding logstash after kafka restarts:

[2019-11-25 11:43:04,639] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-10-2e6bedb9-e420-4fb9-b00c-e450f2c9bce2 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,641] INFO [GroupCoordinator 1001]: Preparing to rebalance group logstash in state PreparingRebalance with old generation 17 (__consumer_offsets-49) (reason: removing member logstash-docker-desktop-10-2e6bedb9-e420-4fb9-b00c-e450f2c9bce2 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-0-3bb02e48-b4f4-424e-9233-c56e6af44d3b in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-7-fd07dca9-2dd1-4bbc-b2e0-f89973a48dba in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-6-ca63542a-8658-4438-93a2-a9414fe2a641 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-5-028c55cc-a5a4-4339-add0-57a059191675 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,643] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-1-7e18cf0b-ecfa-4bc4-a517-d1a89168c092 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-14-c799054a-bdd0-4903-8926-e6044271a553 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-13-9e802762-fae6-4983-a031-0d1bc867fcb0 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-3-4bee5bf1-12a7-42ee-b785-93dfec6726dd in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-11-65fa14aa-1ec8-49d4-a840-e6b796352660 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,644] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-4-de8eaf3d-baf4-425f-a1a9-e4378b759fe4 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-12-f8b42828-fb86-4000-92f9-18c0075ac2f7 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-2-4389c233-c115-419d-bd8c-4161eeebc1c6 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-8-dc5c88bc-dde4-4628-955e-ab87b2a70376 in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,645] INFO [GroupCoordinator 1001]: Member logstash-docker-desktop-9-4dcbfb2a-af32-45f0-bd02-dff26f9655dc in group logstash has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-11-25 11:43:04,647] INFO [GroupCoordinator 1001]: Group logstash with generation 18 is now empty (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)

Configuration of logstash in logstash.conf looks like this:

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    group_id => "logstash"
    topics_pattern => ".*"
    consumer_threads => 15
    metadata_max_age_ms => "30000"
    decorate_events => true
    codec => json
  }
}

And Docker service for kafka has this definition:

  kafka:
    image: wurstmeister/kafka:2.12-2.3.0
    depends_on:
      - zookeeper
    networks:
      - internal
    ports:
      - 9094:9094
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
      KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: 'true'
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094,PLAINTEXT://:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:8088,PLAINTEXT://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/kafka-logs
      KAFKA_LOG_RETENTION_HOURS: 24
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_NUM_PARTITIONS: 2
      JMX_PORT: 9997

Could someone give me a hint if it could be possible to configure logstash or kafka in a different way so that logstash recovers connection to kafka automatically without restaring logstash?

Thanks in advance!

Upvotes: 0

Views: 1848

Answers (1)

Ali Ben Zarrouk
Ali Ben Zarrouk

Reputation: 2018

I see you didnt specify reconnect_backoff_ms https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-reconnect_backoff_ms I guess that's what you're missing.

Upvotes: 1

Related Questions