Mark Lavin
Mark Lavin

Reputation: 1242

How do I set up a State Store for a Transformer

I'm trying to create a Transformer, and running into problms with the initialization of its StateStore. I looked at the example in How to register a stateless processor (that seems to require a StateStore as well)? and it makes sense, but I'm trying something different:

KeyValueBytesStoreSupplier groupToKVStore_supplier = 
    Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< G, KeyValue< K, V > > > groupToKVStore_builder =
    Stores.keyValueStoreBuilder( groupToKVStore_supplier, Gserde, KVserde );
stream_builder.addStateStore( groupToKVStore_builder );

My intention is to use a String as the State Store key and a KeyValue as the State Store value. Is the formulation above correct? I'm asking because when the stream containing my Transformer is starting up, it throws an exception that says:

Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-TRANSFORM-0000000001 has no access to StateStore state_store_1582785598
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:72)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.init(WindowedTimeSorter.java:135)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.init(KStreamTransform.java:51)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:54)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:10

Per Matthias' suggestion, I added a StateStore name argument to the transform invocation in my Stream, and that appears to get us past the error shown above. However, we then get the following exception:

ERROR stream-thread [A.Completely.Different.appID-b04af4b4-fdbb-4353-9aa5-6d71f7c22f9e-StreamThread-1] Failed to process stream task 0_1 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) 
java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:167)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:1)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)

Alas, things are still not quite right: First off, my Transformer's init method is being called three times; it should only be once, rigt? Second, I'm getting a runtime error in my Transformer's transform method the first time it tries to store something into the StateStore:

INFO stream-thread [A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1124) 
Exception in thread "A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1" java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:155)

Upvotes: 0

Views: 2387

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Just adding the store to the topology is not sufficient. You additionally need to connect the store to the transformer by passing the store name into transform():

stream.transform(..., state_store_name);

Update:

For the second exception, I assume that you don't return a new object when TransformerSupplier#get() is called, but you return the same object each time. As the "supplier pattern" suggests, you need to create a new object each time #get() is called (otherwise, a supplier would not make sense and it would be possible to hand in a single object directly). Compare the FAQ: https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata

Upvotes: 2

Related Questions