Ryan Nosworthy
Ryan Nosworthy

Reputation: 21

Adding a global store for a transformer to consume

Is there a way to add a global store for a Transformer to use? In the docs for transformer it says:

"Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and computes zero or more output records. In order to assign a state, the state must be created and registered beforehand via stores added via addStateStore or addGlobalStore before they can be connected to the Transformer"

yet, the API for addGlobalStore on takes a ProcessSupplier?

addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
                     topic: String,
                     consumed: Consumed[_, _],
                     stateUpdateSupplier: ProcessorSupplier[_, _])

My end goal is to the Kafka Streams DSL, with a transformer since I need a flatMap and transform both keys and values to my output topic. I do not have a processor in my topology tho.

I would expect something like this:

addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])

Upvotes: 2

Views: 1978

Answers (2)

Piyush Verma
Piyush Verma

Reputation: 399

Use a Processor instead of Transformer, for all the transformations you want to perform on the input topic, whenever there is a usecase of lookingup data from GlobalStateStore . Use context.forward(key,value,childName) to send the data to the downstream nodes. context.forward(key,value,childName) may be called multiple times in a process() and punctuate() , so as to send multiple records to downstream node. If there is a requirement to update GlobalStateStore, do this only in Processor passed to addGlobalStore(..) because, there is a GlobalStreamThread associated with GlobalStateStore, which keeps the state of the store consistent across all the running kstream instances.

Upvotes: 2

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

The Processor that is passed into addGlobalStore() is use to maintain (ie, write) the store. Note, that's it's expected atm that this Processor copies the data as-is into the store (cf https://issues.apache.org/jira/browse/KAFKA-7663).

After you have added a global store, you can also add a Transformer and the Transformer can access the store. Note, that it's not required to connect a global store to make it available (only "regular" stores, would need to be added). Also note, that a Transformer only gets read access to global stores.

Upvotes: 2

Related Questions