xmar
xmar

Reputation: 1819

Kafka Streams (Scala): Invalid topology: StateStore is not added yet

I have a topology where I have a stream A.

From that stream A, I create a WindowedStore S.

       A  --> [S]

Then I want to make the objects in A transformed depending on data on S, and also these transformed objects to arrive to the WindowStore logic(via transformValues).

For that, I create a Transformer for that, creating a Stream A', and making the windowing aware of it (i.e. now, S will be made from A', not from A).

  A -> A'  --> [S]
       ^__read__|

But I cannot do that, because when I create the Topology, an exception is thrown:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.

Is there a way to work this around? Is this a limitation?

Code example:

  // A 
  val sessionElementsStream: KStream[K, SessionElement] = ...
  // A'
  val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
    // Here we use the sessionStoreName - but it is not added yet to the Topology
    sessionElementsStream.
      transformValues(sessionElementTransformerSupplier, sessionStoreName)
  }

  val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
    sessionElementsTransformed.
      groupByKey(sessionElementTransformedGroupedBy).
      windowedBy(sessionWindows)
  }

  val sessionStore : KTable[Windowed[K], List[WindowedSession]] = 
    sessionElementsWindowedStream.aggregate(
        initializer = List.empty[WindowedSession])(
        aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))     

The original problem, is that depending on previous sessions' values, I would like to change sessions after it. But if I do this in a transformer after the sessioning, these transformed sessions can be changed and sent downstream - but they won't reflect their new state in S - so further requests to the store will have the old values.

Kafka Streams 2.1, Scala 2.12.4. Co-partitioned topics.

UPDATE

There is a way to do this within the DSL, using an extra topic:

However, it sounds cumbersome to have to use an extra topic here. Is there no other, simpler way to solve it?

Upvotes: 1

Views: 2509

Answers (1)

miguno
miguno

Reputation: 15087

But I cannot do that, because when I create the Topology, an exception is thrown:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.

It looks like you simply forgot to literally "add" the state store to your processing topology, and then attach ("make available") the state store to your Transformer.

Here's a code snippet that demonstrates this (sorry, in Java).

Adding the state store to your topology:

final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<String, Long> myStateStore =
    Stores.keyValueStoreBuilder(
             Stores.persistentKeyValueStore("my-state-store-name"),
             Serdes.String(),
             Serdes.Long())
          .withCachingEnabled();
builder.addStateStore(myStateStore);

Attaching the state store to your Transformer:

final KStream<String, Double> stream = builder.stream("your-input-topic", Consumed.with(Serdes.String(), Serdes.Double()));

final KStream<String, Long> transformedStream =
    stream.transform(new YourTransformer(myStateStore.name()), myStateStore.name());

And of course your Transformer must integrate the state store, with code like the following (this Transformer reads <String, Double> and writes String, Long>).

class MyTransformer implements TransformerSupplier<String, Double, KeyValue<String, Long>> {

  private final String myStateStoreName;

  MyTransformer(final String myStateStoreName) {
    this.myStateStoreName = myStateStoreName;
  }

  @Override
  public Transformer<String, Double, KeyValue<String, Long>> get() {
    return new Transformer<String, Double, KeyValue<String, Long>>() {

      private KeyValueStore<String, Long> myStateStore;
      private ProcessorContext context;

      @Override
      public void init(final ProcessorContext context) {
        myStateStore = (KeyValueStore<String, Long>) context.getStateStore(myStateStoreName);
      }

   // ...
  }
}

Upvotes: 2

Related Questions