Reputation: 21
Is there a way to add a global store for a Transformer to use? In the docs for transformer it says:
"Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and computes zero or more output records. In order to assign a state, the state must be created and registered beforehand via stores added via addStateStore or addGlobalStore before they can be connected to the Transformer"
yet, the API for addGlobalStore on takes a ProcessSupplier?
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
stateUpdateSupplier: ProcessorSupplier[_, _])
My end goal is to the Kafka Streams DSL, with a transformer since I need a flatMap and transform both keys and values to my output topic. I do not have a processor in my topology tho.
I would expect something like this:
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])
Upvotes: 2
Views: 1978
Reputation: 399
Use a Processor instead of Transformer, for all the transformations you want to perform on the input topic, whenever there is a usecase of lookingup data from GlobalStateStore . Use context.forward(key,value,childName)
to send the data to the downstream nodes. context.forward(key,value,childName)
may be called multiple times in a process()
and punctuate()
, so as to send multiple records to downstream node. If there is a requirement to update GlobalStateStore, do this only in Processor passed to addGlobalStore(..)
because, there is a GlobalStreamThread associated with GlobalStateStore, which keeps the state of the store consistent across all the running kstream instances.
Upvotes: 2
Reputation: 62350
The Processor
that is passed into addGlobalStore()
is use to maintain (ie, write) the store. Note, that's it's expected atm that this Processor
copies the data as-is into the store (cf https://issues.apache.org/jira/browse/KAFKA-7663).
After you have added a global store, you can also add a Transformer
and the Transformer
can access the store. Note, that it's not required to connect a global store to make it available (only "regular" stores, would need to be added). Also note, that a Transformer
only gets read access to global stores.
Upvotes: 2