Reputation: 504
This is regarding upgrading existing code base in production which uses windowing from kafka-clients,kafka-streams,spring-kafka 2.4.0 to 2.6.x and also upgrading spring-boot-starter-parentfrom 2.2.2.RELEASE to 2.3.x as 2.2 is incompatible with kafka-streams 2.6.
The existing code had these beans mentioned below with old verions(2.4.0,2.2 spring release):
@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder) {
//Your topology code
return streamsBuilder.build();
}
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}
Now after upgrading kafka streams,kafka clients to to 2.6.2 and spring kafka to 2.6.x, the following exception was observed:
2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
Upvotes: 3
Views: 9584
Reputation: 23
I was getting similiar error while my application running on local.
Becasue of
spring-boot-devtools
when ever I edit sourcefile Spring Boot DevTools pick up the changes and restart the application. Due to that multiple instances of Kafka Streams are running in the same state directory.
After removing dev tools plugin I was able to resolve the error.
Upvotes: 0
Reputation: 195
Tried everything but this was the only thing that worked for me
rm -rf /tmp/kafka-streams
Where /tmp/kafka-streams
is the stateDirectory
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDirectory);
Upvotes: 0
Reputation: 1919
I had the same issue as I was running around 7-8 streams together. I just created the unique application id for each stream. Here is the working code along with screenshot for me:
public Properties properties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my-group-id");
return props;
}
Upvotes: 1
Reputation: 31
I was facing the same problem. A single topology in spring boot and I was trying to access the state store for interactive queries. In order to do so I needed a KafkaStreams object as shown below.
GlobalKTable<String, String> configTable = builder.globalTable("config",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("config-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), kconfig.asProperties());
streams.start();
ReadOnlyKeyValueStore<String, String> configView = streams.store(StoreQueryParameters.fromNameAndType("config-store", QueryableStoreTypes.keyValueStore()));
The problem is the Spring Kafka Factory bean starts a topology and calling streams.start() causes the lock on the state store as a second start is called.
This can be fixed by setting the auto start property to false.
spring.kafka.streams.auto-startup=false
That's all you need.
Upvotes: 1
Reputation: 1529
A similar Error can happen when you are running multiple of the same application(name/id)
on the same machine
.
Please visite State.dir to get the idea.
you can add that in Kafka configurations and make it unique per each instance
In case you are using spring cloud stream (cann't have same port in the same machine):
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: ${spring.application.name}${server.port}
In the case of spring stream kafka
:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, springApplicationName);
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.STATE_DIR_CONFIG, String.format("%s%s", springApplicationName, serverPort));
return new KafkaStreamsConfiguration(props);
}
or:
spring.kafka:
bootstrap-servers: ....
streams:
properties:
application.server: localhost:${server.port}
state.dir: ${spring.application.name}${server.port}
Upvotes: 4
Reputation: 348
In my case perfectly works specyfing separate @TestConfiguration
class in which I specify counter for changing application name for each SpringBoot Test Context.
@TestConfiguration
public class TestKafkaStreamsConfig {
private static final AtomicInteger COUNTER = new AtomicInteger();
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
final var props = new HashMap<String, Object>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-id-" + COUNTER.getAndIncrement());
// rest of configuration
return new KafkaStreamsConfiguration(props);
}
}
Of course I had to enable spring bean overriding to replace primary configuration.
Edit: I'm using SpringBoot v. 2.5.10 so in my case to make use of @TestConfiguration
i have to pass it to @SpringBootTest(classes =)
annotation.
Upvotes: 2
Reputation: 86
Above answers to set state dir works perfectly for me. Thanks. Adding one observation that might be helpful for someone working with spring-boot. When working on same machine and trying to bring up multiple kafka stream application instances and If you have enabled property spring.devtools.restart.enabled (which mostly is the case in dev profile), you might want to disable it as when the same application instance restarts automatically it might not get store lock. This is what I was facing and was able to resolve by disabling restart behavior.
Upvotes: 1
Reputation: 4778
If you have a sophisticated Kafka Streams topology in your Spring Cloud Streams Kafka Streams Binder 3.0 style application, you might need to specify different application ids for different functions like the following:
spring.cloud.stream.function.definition: myFirstStream;mySecondStream
...
spring.cloud.stream.kafka.streams:
binder:
functions:
myFirstStream:
applicationId: app-id-1
mySecondStream:
applicationId: app-id-2
Upvotes: 2
Reputation: 121
I've handled problem on versions:
Check this: State directory
By default it is created in temp folder with kafka streams app id like: /var/folders/xw/xgslnvzj1zj6wp86wpd8hqjr0000gn/T/kafka-streams/${spring.kafka.streams.application-id}/.lock
If two or more Kafka Streams apps use the same spring.kafka.streams.application-id then you get this exception. So just change your Kafka Streams apps id's.
Or set directory option manually StreamsConfig.STATE_DIR_CONFIG in streams config.
Upvotes: 2
Reputation: 504
The problem here is newer versions of spring-kafka is initializing one more instance of kafka streams based on topology bean automatically and another bean of generickafkaStreams is getting initialized from existing code base which is resulting in multiple threads trying to lock over state directory and thus the error.
Even disabling the KafkaAutoConfiguration at spring boot level does not disable this behavior. This was such a pain to identify and lost lot of time.
The fix is to get rid of topology bean and have our own custom kafka streams bean as below code:
protected Topology customTopology() {
//topology code
return streamsBuilder.build();
}
/**
* This starts kafka stream application and sets the state listener and state
* store listener.
*
* @return KafkaStreams
*/
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
KafkaStreams kafkaStreams = new KafkaStreams(customTopology(), kstreamsconfigs);
return kafkaStreams;
}
Upvotes: 2