Jay Ghiya
Jay Ghiya

Reputation: 504

StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory

This is regarding upgrading existing code base in production which uses windowing from kafka-clients,kafka-streams,spring-kafka 2.4.0 to 2.6.x and also upgrading spring-boot-starter-parentfrom 2.2.2.RELEASE to 2.3.x as 2.2 is incompatible with kafka-streams 2.6.

The existing code had these beans mentioned below with old verions(2.4.0,2.2 spring release):

@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder)  {
 //Your topology code
 return streamsBuilder.build();
}
    
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}

Now after upgrading kafka streams,kafka clients to to 2.6.2 and spring kafka to 2.6.x, the following exception was observed:

2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN   o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
   

Upvotes: 3

Views: 9584

Answers (10)

Madhura Maddipatla
Madhura Maddipatla

Reputation: 23

I was getting similiar error while my application running on local. Becasue of spring-boot-devtools when ever I edit sourcefile Spring Boot DevTools pick up the changes and restart the application. Due to that multiple instances of Kafka Streams are running in the same state directory.

After removing dev tools plugin I was able to resolve the error.

Upvotes: 0

valeesi
valeesi

Reputation: 195

Tried everything but this was the only thing that worked for me

rm -rf /tmp/kafka-streams

Where /tmp/kafka-streams is the stateDirectory

props.put(StreamsConfig.STATE_DIR_CONFIG, stateDirectory);

Upvotes: 0

Maninder
Maninder

Reputation: 1919

I had the same issue as I was running around 7-8 streams together. I just created the unique application id for each stream. Here is the working code along with screenshot for me:

enter image description here

public Properties properties() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("group.id", "my-group-id");
    return props;
}

Upvotes: 1

Paul Crofts
Paul Crofts

Reputation: 31

I was facing the same problem. A single topology in spring boot and I was trying to access the state store for interactive queries. In order to do so I needed a KafkaStreams object as shown below.

GlobalKTable<String, String> configTable = builder.globalTable("config", 
  Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("config-store")
  .withKeySerde(Serdes.String())
  .withValueSerde(Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), kconfig.asProperties());
streams.start();

ReadOnlyKeyValueStore<String, String> configView = streams.store(StoreQueryParameters.fromNameAndType("config-store", QueryableStoreTypes.keyValueStore()));

The problem is the Spring Kafka Factory bean starts a topology and calling streams.start() causes the lock on the state store as a second start is called.

This can be fixed by setting the auto start property to false.

spring.kafka.streams.auto-startup=false

That's all you need.

Upvotes: 1

Jay Ehsaniara
Jay Ehsaniara

Reputation: 1529

A similar Error can happen when you are running multiple of the same application(name/id) on the same machine.

Please visite State.dir to get the idea.

you can add that in Kafka configurations and make it unique per each instance

In case you are using spring cloud stream (cann't have same port in the same machine):

 spring.cloud.stream.kafka.streams.binder.configuration.state.dir: ${spring.application.name}${server.port}

UPDATE:

In the case of spring stream kafka:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(APPLICATION_ID_CONFIG, springApplicationName);
    props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(StreamsConfig.STATE_DIR_CONFIG, String.format("%s%s", springApplicationName, serverPort));

    return new KafkaStreamsConfiguration(props);
}

or:

spring.kafka:
    bootstrap-servers: ....
    streams:
      properties:
        application.server: localhost:${server.port}
        state.dir: ${spring.application.name}${server.port}

Upvotes: 4

elhose
elhose

Reputation: 348

In my case perfectly works specyfing separate @TestConfiguration class in which I specify counter for changing application name for each SpringBoot Test Context.

@TestConfiguration
public class TestKafkaStreamsConfig {
    private static final AtomicInteger COUNTER = new AtomicInteger();

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        final var props = new HashMap<String, Object>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-id-" + COUNTER.getAndIncrement());

//      rest of configuration  
        return new KafkaStreamsConfiguration(props);
    }
}

Of course I had to enable spring bean overriding to replace primary configuration.

Edit: I'm using SpringBoot v. 2.5.10 so in my case to make use of @TestConfiguration i have to pass it to @SpringBootTest(classes =) annotation.

Upvotes: 2

ViJ
ViJ

Reputation: 86

Above answers to set state dir works perfectly for me. Thanks. Adding one observation that might be helpful for someone working with spring-boot. When working on same machine and trying to bring up multiple kafka stream application instances and If you have enabled property spring.devtools.restart.enabled (which mostly is the case in dev profile), you might want to disable it as when the same application instance restarts automatically it might not get store lock. This is what I was facing and was able to resolve by disabling restart behavior.

Upvotes: 1

Sergey Shcherbakov
Sergey Shcherbakov

Reputation: 4778

If you have a sophisticated Kafka Streams topology in your Spring Cloud Streams Kafka Streams Binder 3.0 style application, you might need to specify different application ids for different functions like the following:

spring.cloud.stream.function.definition: myFirstStream;mySecondStream
...
spring.cloud.stream.kafka.streams:
  binder:
    functions:
      myFirstStream:
        applicationId: app-id-1
      mySecondStream:
        applicationId: app-id-2

Upvotes: 2

dev_fondue
dev_fondue

Reputation: 121

I've handled problem on versions:

  • org.springframework.boot version 2.5.3
  • org.springframework.kafka:spring-kafka:2.7.5
  • org.apache.kafka:kafka-clients:2.8.0
  • org.apache.kafka:kafka-streams:2.8.0

Check this: State directory

By default it is created in temp folder with kafka streams app id like: /var/folders/xw/xgslnvzj1zj6wp86wpd8hqjr0000gn/T/kafka-streams/${spring.kafka.streams.application-id}/.lock

If two or more Kafka Streams apps use the same spring.kafka.streams.application-id then you get this exception. So just change your Kafka Streams apps id's.

Or set directory option manually StreamsConfig.STATE_DIR_CONFIG in streams config.

Upvotes: 2

Jay Ghiya
Jay Ghiya

Reputation: 504

The problem here is newer versions of spring-kafka is initializing one more instance of kafka streams based on topology bean automatically and another bean of generickafkaStreams is getting initialized from existing code base which is resulting in multiple threads trying to lock over state directory and thus the error.

Even disabling the KafkaAutoConfiguration at spring boot level does not disable this behavior. This was such a pain to identify and lost lot of time.

The fix is to get rid of topology bean and have our own custom kafka streams bean as below code:

  protected Topology customTopology()  {

    //topology code
    return streamsBuilder.build();
    }

    /**
     * This starts kafka stream application and sets the state listener and state
     * store listener.
     * 
     * @return KafkaStreams
     */
    @Bean("GenericKafkaStreams")
    public KafkaStreams kStream() {
    KafkaStreams kafkaStreams = new KafkaStreams(customTopology(), kstreamsconfigs);
    return kafkaStreams;
    }

Upvotes: 2

Related Questions