Reputation: 41
I have this requirement of using kafka streams to aggregate data from two different topics and have a summary as output. The problem is that, the application where this is going to be implemented is meant to run in multiple instances. In each instance we will have to start the streams inorder to query the state store. But because of this I am getting the below error
org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, test.topic.store.1, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:67) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53) ~[kafka-streams-3.0.0.jar:na]
at com.example.test.SimpleController.accessStore(SimpleController.java:41) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.14.jar:5.3.14]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.14.jar:5.3.14]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.14.jar:5.3.14]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.14.jar:5.3.14]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.14.jar:5.3.14]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.14.jar:5.3.14]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.14.jar:5.3.14]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
I can understand the gist of things. When I start the streams from the first instance, everything works fine. When I start the streams of the second instance, the partitions are revoked, the stream goes into rebalancing state and then starts running.
2022-03-11 18:24:21.638 INFO 23128 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1-consumer, groupId=STREAMS_APP_] Adding newly assigned partitions:
2022-03-11 18:24:21.638 INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
2022-03-11 18:24:21.724 INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] Restoration took 86 ms for all tasks []
2022-03-11 18:24:21.724 INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-03-11 18:24:21.724 INFO 23128 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5] State transition from REBALANCING to RUNNING
But once this happens I am not able to access the state store from both the instances. Ending up with The state store, test.topic.store.1, may have migrated to another instance error. Is there a way that I can rely on to read a KTable from two different instances of the application without the store getting migrated.
The below is the code I am using
Config:
@Bean
public StreamsBuilder simpleKafkaStreamBuilder() throws UnknownHostException {
final StreamsBuilder builder = new StreamsBuilder();
KTable<String, Test> table1 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.aggregate(Test::new, this::aggregateData, materializeStoreV1());
KTable<String, Test> table2 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.aggregate(Test::new, this::aggregateData, materializeStoreV2());
return builder;
}
private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV1() {
return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE1))
.withKeySerde(Serdes.String())
.withValueSerde(testSerde());
}
private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV2() {
return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE2))
.withKeySerde(Serdes.String())
.withValueSerde(testSerde());
}
private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV3() {
return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE))
.withKeySerde(Serdes.String())
.withValueSerde(testSerde());
}
private Test aggregateData(String key, String value, Test test) {
test.setKey(key);
test.incrCount();
return test;
}
Controller
@RestController
public class SimpleController {
private final StreamsBuilder simpleKafkaStreamBuilder;
@Autowired
public SimpleController(StreamsBuilder simpleKafkaStreamBuilder) {
this.simpleKafkaStreamBuilder = simpleKafkaStreamBuilder;
}
@Value("${server.port}")
private String serverPort;
private static KafkaStreams kafkaStreams;
@GetMapping("/start-streams")
public void startStreams() throws UnknownHostException {
Properties properties = SimpleKafkaUtil.getDefaultStreamProperties();
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, InetAddress.getLocalHost().getHostAddress() + ":" + serverPort);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
kafkaStreams = new KafkaStreams(simpleKafkaStreamBuilder.build(), properties);
System.out.println("[x] stream 1 " + kafkaStreams.toString());
// kafkaStreams.cleanUp();
kafkaStreams.start();
}
@GetMapping("/access-store/{key}")
public void accessStore(@PathVariable("key") String key) throws InterruptedException {
System.out.println("[x] stream 2 " + kafkaStreams.toString());
while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
Thread.sleep(5000);
}
List<StreamsMetadata> metadataList = (List<StreamsMetadata>) kafkaStreams.streamsMetadataForStore(SimpleKafkaConfig.TEST_STORE1);
KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey(SimpleKafkaConfig.TEST_STORE1, key, Serdes.String().serializer());
System.out.println(" [x] Host: " + metadata.activeHost().host());
System.out.println(" [x] Host: " + metadata.activeHost().port());
String hostUri = "http://" + metadata.activeHost().host() + ":" + metadata.activeHost().port();
RestTemplate restTemplate = new RestTemplate();
Test test = restTemplate.getForObject(hostUri + "/access-storeV2/" + key, Test.class);
System.out.println("[x] KEY:" + key);
System.out.println("[x] VALUE:" + test);
}
@GetMapping("/access-storeV2/{key}")
public Test accessStoreV2(@PathVariable("key") String key) throws InterruptedException{
System.out.println("[x] stream 2 " + kafkaStreams.toString());
while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
Thread.sleep(5000);
}
ReadOnlyKeyValueStore<String, Test> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(SimpleKafkaConfig.TEST_STORE1, QueryableStoreTypes.keyValueStore()));
System.out.println("[x] Store KEY:" + key);
System.out.println("[x] Store VALUE:" + store.get(key));
return store.get(key);
}
}
Upvotes: 0
Views: 1482
Reputation: 41
Reference : Kafka Interactive Queries
For a multi-instance application setup, if you need to use kafka streams, there is one way to accommodate that setup. Let me get there in a min but first we need to understand what is happening when we start a stream in some application.
When starting a kafka stream over a topic in an application this is what happens.
org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, store.name, may have migrated to another instance.
These are some rules to have a multi-instance kafka stream application.
application.id
in the streams config.application.servers = host:port
, this is important.Once we do this, whenever we start the stream from the instances of the application, the stream stores the server details from application.servers
in the streams' metadata which we can access using streams.allMetadataForStore(storeName)
. We can then use this metadata to find out which server is holding the key which we can access using
StreamsMetadata metadata = streams.metadataForKey(storeName, key, keySerializer);
metadata.host(); // gives us the server host
metadata.port(); // gives us the server port
We can use this data to make a call to the actual server that holds the value using REST call in my case. I am posting some snippets of the actual working code below.
Config
@Bean
public StreamsBuilder simpleKafkaStreamBuilder() throws UnknownHostException {
final StreamsBuilder builder = new StreamsBuilder();
KTable<String, Test> table1 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.aggregate(Test::new, this::aggregateData, materializeStoreV1());
return builder;
}
private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV1() {
return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE1))
.withKeySerde(Serdes.String())
.withValueSerde(testSerde());
}
private Test aggregateData(String key, String value, Test test) {
test.setKey(key);
test.incrCount();
return test;
}
public static Properties getDefaultStreamProperties(){
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCAL_KAFKA_SERVER);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "STREAMS_APP_");
return props;
}
RestController
@GetMapping("/start-streams")
public void startStreams() throws UnknownHostException {
Properties properties = SimpleKafkaUtil.getDefaultStreamProperties();
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, InetAddress.getLocalHost().getHostAddress() + ":" + serverPort);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
kafkaStreams = new KafkaStreams(simpleKafkaStreamBuilder.build(properties), properties);
System.out.println("[x] stream 1 " + kafkaStreams.toString());
kafkaStreams.start();
}
@GetMapping("/access-store/{key}")
public void accessStore(@PathVariable("key") String key) throws InterruptedException {
System.out.println("[x] stream 2 " + kafkaStreams.toString());
while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
Thread.sleep(5000);
}
List<StreamsMetadata> metadataList = (List<StreamsMetadata>) kafkaStreams.allMetadataForStore(SimpleKafkaConfig.TEST_STORE1);
metadataList.forEach(metadata -> {
System.out.println(" [x] Host from list: " + metadata.host());
System.out.println(" [x] Port from list: " + metadata.port());
});
StreamsMetadata metadata = kafkaStreams.metadataForKey(SimpleKafkaConfig.TEST_STORE1, key, Serdes.String().serializer());
System.out.println(" [x] Host: " + metadata.host());
System.out.println(" [x] Port: " + metadata.port());
String hostUri = "http://" + metadata.host() + ":" + metadata.port();
RestTemplate restTemplate = new RestTemplate();
Test test = restTemplate.getForObject(hostUri + "/access-storeV2/" + key, Test.class);
System.out.println("[x] KEY:" + key);
System.out.println("[x] VALUE:" + test);
}
@GetMapping("/access-storeV2/{key}")
public Test accessStoreV2(@PathVariable("key") String key) throws InterruptedException{
System.out.println("[x] stream 2 " + kafkaStreams.toString());
while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
Thread.sleep(5000);
}
ReadOnlyKeyValueStore<String, Test> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(SimpleKafkaConfig.TEST_STORE1, QueryableStoreTypes.keyValueStore()));
System.out.println("[x] Store KEY:" + key);
System.out.println("[x] Store VALUE:" + store.get(key));
return store.get(key);
}
Upvotes: 2