Reputation: 1819
I have a topology where I have a stream A
.
From that stream A
, I create a WindowedStore S
.
A --> [S]
Then I want to make the objects in A transformed depending on data on S
, and also these transformed objects to arrive to the WindowStore logic(via transformValues
).
For that, I create a Transformer for that, creating a Stream A'
, and making the windowing aware of it (i.e. now, S
will be made from A'
, not from A
).
A -> A' --> [S]
^__read__|
But I cannot do that, because when I create the Topology, an exception is thrown:
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.
Is there a way to work this around? Is this a limitation?
Code example:
// A
val sessionElementsStream: KStream[K, SessionElement] = ...
// A'
val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
// Here we use the sessionStoreName - but it is not added yet to the Topology
sessionElementsStream.
transformValues(sessionElementTransformerSupplier, sessionStoreName)
}
val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
sessionElementsTransformed.
groupByKey(sessionElementTransformedGroupedBy).
windowedBy(sessionWindows)
}
val sessionStore : KTable[Windowed[K], List[WindowedSession]] =
sessionElementsWindowedStream.aggregate(
initializer = List.empty[WindowedSession])(
aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))
The original problem, is that depending on previous sessions' values, I would like to change sessions after it. But if I do this in a transformer after the sessioning, these transformed sessions can be changed and sent downstream - but they won't reflect their new state in S
- so further requests to the store will have the old values.
Kafka Streams 2.1, Scala 2.12.4. Co-partitioned topics.
UPDATE
There is a way to do this within the DSL, using an extra topic:
to
this topicbuilder.stream
from this topic and build store from it.However, it sounds cumbersome to have to use an extra topic here. Is there no other, simpler way to solve it?
Upvotes: 1
Views: 2509
Reputation: 15087
But I cannot do that, because when I create the Topology, an exception is thrown:
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.
It looks like you simply forgot to literally "add" the state store to your processing topology, and then attach ("make available") the state store to your Transformer
.
Here's a code snippet that demonstrates this (sorry, in Java).
Adding the state store to your topology:
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<String, Long> myStateStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-state-store-name"),
Serdes.String(),
Serdes.Long())
.withCachingEnabled();
builder.addStateStore(myStateStore);
Attaching the state store to your Transformer
:
final KStream<String, Double> stream = builder.stream("your-input-topic", Consumed.with(Serdes.String(), Serdes.Double()));
final KStream<String, Long> transformedStream =
stream.transform(new YourTransformer(myStateStore.name()), myStateStore.name());
And of course your Transformer
must integrate the state store, with code like the following (this Transformer
reads <String, Double>
and writes String, Long>
).
class MyTransformer implements TransformerSupplier<String, Double, KeyValue<String, Long>> {
private final String myStateStoreName;
MyTransformer(final String myStateStoreName) {
this.myStateStoreName = myStateStoreName;
}
@Override
public Transformer<String, Double, KeyValue<String, Long>> get() {
return new Transformer<String, Double, KeyValue<String, Long>>() {
private KeyValueStore<String, Long> myStateStore;
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
myStateStore = (KeyValueStore<String, Long>) context.getStateStore(myStateStoreName);
}
// ...
}
}
Upvotes: 2