Samy
Samy

Reputation: 311

Unable to create state store in Kafka streams

I am getting this error Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1 while creating state store in my kafka streams application. Here is the complete stack trace of the application

[2016-08-30 12:43:09,408] ERROR [StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group string-monitor failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    ... 1 more
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more

And I create a state store as below

 StateStoreSupplier avgStore = Stores.create("avgStore")
          .withKeys(Serdes.String())
          .withValues(Serdes.String())
          .persistent()
          .build();

Any idea of how to fix this?

Upvotes: 4

Views: 11602

Answers (3)

Stanislau
Stanislau

Reputation: 432

Just a note for those who receives the same Failed to lock the state directory exception and uses The Spring for Apache Kafka (spring-kafka) (example from spring doc):

It took me a while to figure out in the source code that Spring starts your built Steams automatically, so there is no need for you to do manually something like:

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

I dit it and faced the aforementioned exception because, not surpassingly, there were two Application instances running: one is created by Spring, another by me

Upvotes: 1

robert towne
robert towne

Reputation: 191

I also saw this problem when the user didn't have permissions to write to the default state.dir

When I changed the following property to a dir w/good permissions everything was fine:

property.put(StreamsConfig.STATE_DIR_CONFIG, "{goodDir}")

This was observed in 0.10.2

Upvotes: 3

Guozhang Wang
Guozhang Wang

Reputation: 489

Did you configure multiple threads within your application instance? If yes, it may be due to a known issue in older versions of Kafka, where the underlying consumer used by Kafka Streams (in the app instance) can take too long to rebalance, causing itself to be kicked out of the consumer group (and hence triggering another consumer group rebalance) while it is still under the first rebalance process.

The following error messages in your stacktrace indicate that you are actually running into the problem I described above:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager

The problem is summarized in this Apache Kafka ticket:

https://issues.apache.org/jira/browse/KAFKA-3758

A recent change to the underlying Kafka consumer client fixes this issue:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

However it is not included in an official Kafka release yet and, as of today, is available only in Apache Kafka trunk. If you are able to run your app with Kafka trunk you can verify if this problem has gone away already.

Upvotes: 4

Related Questions