Charles Finney
Charles Finney

Reputation: 41

Kafka Streams: The state store, test.topic.store.1, may have migrated to another instance

I have this requirement of using kafka streams to aggregate data from two different topics and have a summary as output. The problem is that, the application where this is going to be implemented is meant to run in multiple instances. In each instance we will have to start the streams inorder to query the state store. But because of this I am getting the below error

org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, test.topic.store.1, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:67) ~[kafka-streams-3.0.0.jar:na]
    at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53) ~[kafka-streams-3.0.0.jar:na]
    at com.example.test.SimpleController.accessStore(SimpleController.java:41) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.14.jar:5.3.14]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.14.jar:5.3.14]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.14.jar:5.3.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.14.jar:5.3.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.14.jar:5.3.14]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.14.jar:5.3.14]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

I can understand the gist of things. When I start the streams from the first instance, everything works fine. When I start the streams of the second instance, the partitions are revoked, the stream goes into rebalancing state and then starts running.

2022-03-11 18:24:21.638  INFO 23128 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1-consumer, groupId=STREAMS_APP_] Adding newly assigned partitions: 
2022-03-11 18:24:21.638  INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
2022-03-11 18:24:21.724  INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] Restoration took 86 ms for all tasks []
2022-03-11 18:24:21.724  INFO 23128 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-03-11 18:24:21.724  INFO 23128 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [STREAMS_APP_-e058675d-800a-4c91-8dd3-1f24025f20d5] State transition from REBALANCING to RUNNING

But once this happens I am not able to access the state store from both the instances. Ending up with The state store, test.topic.store.1, may have migrated to another instance error. Is there a way that I can rely on to read a KTable from two different instances of the application without the store getting migrated.

The below is the code I am using

Config:

@Bean
    public StreamsBuilder simpleKafkaStreamBuilder() throws UnknownHostException {
        final StreamsBuilder builder = new StreamsBuilder();

        KTable<String, Test> table1 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .groupByKey()
                .aggregate(Test::new, this::aggregateData, materializeStoreV1());

        KTable<String, Test> table2 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .groupByKey()
                .aggregate(Test::new, this::aggregateData, materializeStoreV2());

        return builder;
    }

    private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV1() {
        return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE1))
                .withKeySerde(Serdes.String())
                .withValueSerde(testSerde());
    }

    private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV2() {
        return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE2))
                .withKeySerde(Serdes.String())
                .withValueSerde(testSerde());
    }

    private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV3() {
        return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE))
                .withKeySerde(Serdes.String())
                .withValueSerde(testSerde());
    }

    private Test aggregateData(String key, String value, Test test) {
        test.setKey(key);
        test.incrCount();
        return test;
    }

Controller

@RestController
public class SimpleController {
    private final StreamsBuilder simpleKafkaStreamBuilder;

    @Autowired
    public SimpleController(StreamsBuilder simpleKafkaStreamBuilder) {
        this.simpleKafkaStreamBuilder = simpleKafkaStreamBuilder;
    }

    @Value("${server.port}")
    private String serverPort;

    private static KafkaStreams kafkaStreams;

    @GetMapping("/start-streams")
    public void startStreams() throws UnknownHostException {
        Properties properties = SimpleKafkaUtil.getDefaultStreamProperties();
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, InetAddress.getLocalHost().getHostAddress() + ":" + serverPort);
        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
        kafkaStreams = new KafkaStreams(simpleKafkaStreamBuilder.build(), properties);
        System.out.println("[x] stream 1 " + kafkaStreams.toString());
//        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

    @GetMapping("/access-store/{key}")
    public void accessStore(@PathVariable("key") String key) throws InterruptedException {
        System.out.println("[x] stream 2 " + kafkaStreams.toString());
        while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
            Thread.sleep(5000);
        }

        List<StreamsMetadata> metadataList = (List<StreamsMetadata>) kafkaStreams.streamsMetadataForStore(SimpleKafkaConfig.TEST_STORE1);

        KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey(SimpleKafkaConfig.TEST_STORE1, key, Serdes.String().serializer());
        System.out.println(" [x] Host: " + metadata.activeHost().host());
        System.out.println(" [x] Host: " + metadata.activeHost().port());

        String hostUri = "http://" + metadata.activeHost().host() + ":" + metadata.activeHost().port();

        RestTemplate restTemplate = new RestTemplate();
        Test test = restTemplate.getForObject(hostUri + "/access-storeV2/" + key, Test.class);
        System.out.println("[x] KEY:" + key);
        System.out.println("[x] VALUE:" + test);
    }

    @GetMapping("/access-storeV2/{key}")
    public Test accessStoreV2(@PathVariable("key") String key) throws InterruptedException{
        System.out.println("[x] stream 2 " + kafkaStreams.toString());
        while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
            Thread.sleep(5000);
        }

        ReadOnlyKeyValueStore<String, Test> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(SimpleKafkaConfig.TEST_STORE1, QueryableStoreTypes.keyValueStore()));
        System.out.println("[x] Store KEY:" + key);
        System.out.println("[x] Store VALUE:" + store.get(key));

        return store.get(key);
    }
}

Upvotes: 0

Views: 1482

Answers (1)

Charles Finney
Charles Finney

Reputation: 41

Reference : Kafka Interactive Queries

For a multi-instance application setup, if you need to use kafka streams, there is one way to accommodate that setup. Let me get there in a min but first we need to understand what is happening when we start a stream in some application.

When starting a kafka stream over a topic in an application this is what happens.

  1. The toplogy for the stream is created, i.e when we try to stream data from a topic, the elements that we configure are attached to the stream in form of nodes. (e.g) If I create a KTable out of a stream, then this KTable is added as a node to the topic topology, hence the name Topology
  2. The application gets attached to a partition in the topic (now this is important). If there is only one partition in the topic, then you can only attach one instance of the application to it. you can also attach one instance of an application to multiple partitions of a topic. So from this what we understand is that, if we want multiple instances of an application to be connected to a topic through kafka streams, then that particular topic should have a number of partitions that is greater than or equal to the number of instances. That does half the magic. If the number of partitions is less than the number of instances of an application. then one of the instances is going to try and share a partition with another instance and it will end up changing the session details of that partitions and neither of the two instances of application will be able to connect to it. This is what ends up throwing the exception
org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, store.name, may have migrated to another instance.

These are some rules to have a multi-instance kafka stream application.

  1. The number of topic partitions should be greater than or equal to the number of instances of application.
  2. All the instance of the application should have the same application.id in the streams config.
  3. We need to add one more config application.servers = host:port, this is important.

Once we do this, whenever we start the stream from the instances of the application, the stream stores the server details from application.servers in the streams' metadata which we can access using streams.allMetadataForStore(storeName). We can then use this metadata to find out which server is holding the key which we can access using

StreamsMetadata metadata = streams.metadataForKey(storeName, key, keySerializer);
metadata.host(); // gives us the server host
metadata.port(); // gives us the server port

We can use this data to make a call to the actual server that holds the value using REST call in my case. I am posting some snippets of the actual working code below.

Config

 @Bean
    public StreamsBuilder simpleKafkaStreamBuilder() throws UnknownHostException {
        final StreamsBuilder builder = new StreamsBuilder();

        KTable<String, Test> table1 = builder.stream(TEST_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .groupByKey()
                .aggregate(Test::new, this::aggregateData, materializeStoreV1());

        return builder;
    }

    private Materialized<String, Test, KeyValueStore<Bytes, byte[]>> materializeStoreV1() {
        return Materialized.<String, Test>as(Stores.inMemoryKeyValueStore(TEST_STORE1))
                .withKeySerde(Serdes.String())
                .withValueSerde(testSerde());
    }

private Test aggregateData(String key, String value, Test test) {
        test.setKey(key);
        test.incrCount();
        return test;
    }

public static Properties getDefaultStreamProperties(){
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCAL_KAFKA_SERVER);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "STREAMS_APP_");
        return props;
    }

RestController

@GetMapping("/start-streams")
    public void startStreams() throws UnknownHostException {
        Properties properties = SimpleKafkaUtil.getDefaultStreamProperties();
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, InetAddress.getLocalHost().getHostAddress() + ":" + serverPort);
        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
        kafkaStreams = new KafkaStreams(simpleKafkaStreamBuilder.build(properties), properties);
        System.out.println("[x] stream 1 " + kafkaStreams.toString());
        kafkaStreams.start();
    }

    @GetMapping("/access-store/{key}")
    public void accessStore(@PathVariable("key") String key) throws InterruptedException {
        System.out.println("[x] stream 2 " + kafkaStreams.toString());
        while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
            Thread.sleep(5000);
        }

        List<StreamsMetadata> metadataList = (List<StreamsMetadata>) kafkaStreams.allMetadataForStore(SimpleKafkaConfig.TEST_STORE1);
metadataList.forEach(metadata -> {
        System.out.println(" [x] Host from list: " + metadata.host());
        System.out.println(" [x] Port from list: " + metadata.port());
});

        StreamsMetadata metadata = kafkaStreams.metadataForKey(SimpleKafkaConfig.TEST_STORE1, key, Serdes.String().serializer());
        System.out.println(" [x] Host: " + metadata.host());
        System.out.println(" [x] Port: " + metadata.port());

        String hostUri = "http://" + metadata.host() + ":" + metadata.port();

        RestTemplate restTemplate = new RestTemplate();
        Test test = restTemplate.getForObject(hostUri + "/access-storeV2/" + key, Test.class);
        System.out.println("[x] KEY:" + key);
        System.out.println("[x] VALUE:" + test);
    }

    @GetMapping("/access-storeV2/{key}")
    public Test accessStoreV2(@PathVariable("key") String key) throws InterruptedException{
        System.out.println("[x] stream 2 " + kafkaStreams.toString());
        while (!kafkaStreams.state().equals(KafkaStreams.State.RUNNING)){
            Thread.sleep(5000);
        }

        ReadOnlyKeyValueStore<String, Test> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(SimpleKafkaConfig.TEST_STORE1, QueryableStoreTypes.keyValueStore()));
        System.out.println("[x] Store KEY:" + key);
        System.out.println("[x] Store VALUE:" + store.get(key));
        return store.get(key);
    }

Upvotes: 2

Related Questions