Reputation: 43
I am trying to use a GlobalKTable to store data from a Kafka topic during the startup of my application. Both the GlobalKTable and KStream consume the same topic, I need to access headers in the GlobalKTable for processing. After the initial data load, my KStream will handle real-time records. these realtime record may be new record or exist in GlobalKTable. The challenge is that I can't access the Kafka headers in the GlobalKTable configuration. Here's a summary of my setup:
Two configuration classes: one for GlobalKTable and one for KStream. I need to initialize KStream after GlobalKTable is fully loaded. Here’s the GlobalKTableConfig class where I want to access the headers:
@Bean(name = "createGlobalKTable")
public GlobalKTable<String, GenericRecord> createGlobalKTable(@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
return streamsBuilder.globalTable(ediLegTopic,
Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde));
}
an this is KstreamConfig Class:
@Bean
public StreamsBuilderFactoryBean kStreamBuilder(
@Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config,
GlobalKTableService globalKTableService) {
StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean(config, new CleanupConfig(false, false));
factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
log.info("State changed from " + oldState + " to " + newState);
if (newState == KafkaStreams.State.RUNNING) {
KeyValueIterator<String, GenericRecord> keyValueIterator = globalKTableService.getAllValuesFromGlobalKTable();
// process each record regarding the logic
}
});
});
return factoryBean;
}
@Bean
public KStream<String, GenericRecord> createKStream(
@Qualifier("kStreamBuilder") StreamsBuilderFactoryBean kStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder kStreamBuilder = kStreamsBuilderFactoryBean.getObject();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
KStream<String, GenericRecord> kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));
StoreBuilder<KeyValueStore<String, GenericRecord>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName + "-global"),
Serdes.String(),
valueGenericAvroSerde
);
kStreamBuilder.addStateStore(storeBuilder);
KStream<String, GenericRecord> processedStream = kStream.process(() -> new CustomProcessor(globalKTableService, logicProvider), storeName + "-global");
return processedStream;
}
}
I have a processor to apply the logic for each realtime record:
public class CustomProcessor implements Processor<String, GenericRecord, String, GenericRecord> {
private ProcessorContext<String, GenericRecord> context;
private final GlobalKTableService globalKTableService;
private final LogicProvider logicProvider;
public CustomProcessor(GlobalKTableService globalKTableService, LogicProvider logicProvider) {
this.globalKTableService = globalKTableService;
this.logicProvider= logicProvider;
}
@Override
public void init(ProcessorContext<String, GenericRecord> context) {
this.context = context;
}
@Override
public void process(Record<String, GenericRecord> processingRecord) {
String key = processingRecord.key();
GenericRecord value = processingRecord.value();
Headers headers = processingRecord.headers();
GenericRecord globalTableValue = globalKTableService.getValueFromGlobalKTable(key);
if ((!value.equals(globalTableValue))) {
log.info("No match in GlobalKTable. Forwarding stream record with key: {}", key);
logicProvider.applyLogic(new KeyValue<>(key, value), headers);
context.forward(processingRecord.withValue(value));
} else {
log.info("Match found in GlobalKTable for key: {}. Skipping forward.", key);
}
}
@Override
public void close() {
log.info("Closing CustomProcessor");
}
}
How can I access the Kafka headers in the GlobalKTable configuration? This link Kafka Streams GlobalKTable and accessing the record headers has a related topic, but I cannot understand what does it mean.
Upvotes: 0
Views: 77
Reputation: 62285
KTables and GlobalKTables do not store header information, but only key, value, and record timestamp.
To be able to access the headers, you would need to pre-process the data to copy the headers into the value. Such a pre-processing step implies that you read the original topic, and write the modified data back into a second topic, and populate the GlobalKTable from the second topic.
If using a second topic is a concern, you should be able to avoid it, if you are using Kafka Streams 3.8+. You can use addGlobalStore()
(instead of globalTable()
which is only syntactic sugar on top of addGlobalStore()
) and implement a custom Processor
that put()
the data into the global store -- this allows you to add the headers on-the-fly before calling put()
).
Upvotes: 0