Dexter
Dexter

Reputation: 1750

Kafka Streams Application died with "StreamsException: Could not create internal topics."

I am evaluating Kafka streams and made a simple application and left it running overnight. I ran it on 2 instances with 1 stream thread per instance. I have a 2 broker Kafka cluster.

The StreamsConfig :

private Map<String, Object> settings() {

    Map<String, Object> settings = new HashMap<>();
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "fare_tracker");
    settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverAddress + ":" + serverPort);
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
    settings.put(StreamsConfig.STATE_DIR_CONFIG, directoryName);
    settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
    settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, AvroTimeStampExtractor.class);
    settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
    settings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);

    settings.put("schema.registry.url", "http://zookeeper1:8081");

    settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "snappy");
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 3);
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 500);

    return settings;
}

It died roughly 12 hours after starting, with below stack-trace :

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
        at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:81)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:628)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:382)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:343)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:501)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:451)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:433)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

I found some relevant WARN and ERROR logs.

{
  "@timestamp": "2017-06-07T05:44:26.996+05:30",
  "@version": 1,
  "message": "Got error produce response with correlation id 198191 on topic-partition fare_tracker-small_window-changelog-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION",
  "logger_name": "org.apache.kafka.clients.producer.internals.Sender",
  "thread_name": "kafka-producer-network-thread | fare_tracker-9e0a04f4-c1cc-4b61-8ca5-8bf25f18549f-StreamThread-1-producer",
  "level": "WARN",
  "level_value": 30000
}

^This seems like a generic network issue, I have the producer configured with 3 retries.

Application1 died with the following logs:

{
  "@timestamp": "2017-06-07T06:20:35.122+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Failed to commit StreamTask 2_61 state: ",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000,
  "stack_trace": org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
    at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
}

{
  "@timestamp": "2017-06-07T06:20:35.236+05:30",
  "@version": 1,
  "message": "Bootstrap broker kafka2:9092 disconnected",
  "logger_name": "org.apache.kafka.clients.NetworkClient",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:36.100+05:30",
  "@version": 1,
  "message": "Could not create internal topics: Found only 1 brokers,  but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
  "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:36.914+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}

Application 2 died with the following logs:

{
  "@timestamp": "2017-06-07T06:20:06.254+05:30",
  "@version": 1,
  "message": "Could not create internal topics: Found only 1 brokers,  but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
  "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:07.041+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}

I checked my other applications, and although the are running fine, I saw the following log multiple times at around the same time as above.

    {
      "@timestamp": "2017-06-07T06:10:34.962+05:30",
      "@version": 1,
      "message": "Publishing to kafka failed ",
      "thread_name": "kafka-producer-network-thread | producer-1",
      "level": "ERROR",
      "level_value": 40000,
      "stack_trace": org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:255)
    at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
    at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
    at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time

org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
    }

Upvotes: 1

Views: 4378

Answers (1)

miguno
miguno

Reputation: 15057

You said you have 2 Kafka brokers, but one of the error messages includes the following information:

Could not create internal topics: Found only 1 brokers, but replication factor is 2.

It looks like you have network connectivity issues between your application(s) and the Kafka brokers (and perhaps also between the Kafka brokers themselves). If such a network issue persists for a longer period of time, eventually applications that try to communicate with the Kafka brokers will fail sooner or later (depending on their settings).

Upvotes: 1

Related Questions