Nandish Kotadia
Nandish Kotadia

Reputation: 461

Kafka Stream Startup Issue - org.apache.kafka.streams.errors.LockException

I have a Kafka Streams Application version - 0.11 which takes data from few topics and joins the data and puts it in another topic.

Kafka Configuration:

5 kafka brokers - version 0.11
Kafka Topics - 15 partitions and 3 replication factor.

Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it throws below Exception:

org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
    at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73)
    at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

I have read at few jira issues that cleaningUp the streams might help to fix the issue. But cleaningUp the streams everytime we start the Kafka Stream Application is a right solution or a patch? Also, stream cleanUp will delay the application startup right?

Note: Do I need to call streams.cleanUp() before calling streams.start(), each time I start the Kafka Streams application

Upvotes: 0

Views: 2262

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Seeing a org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10 is actually expected and should resolve itself. The thread will back off in order to wait until another thread releases the lock and retries later. Thus, you might even see this WARN message is the logs multiple time in case the retry happens before the second thread did release the lock.

However, eventually the lock should be release by the second thread and the first thread will be able to get the lock. Afterwards, Streams should just move forward. Note, it's a WARN message and not an error.

Upvotes: 1

Related Questions