Misa
Misa

Reputation: 43

How to Access Headers in a GlobalKTable in Kafka Streams

I am trying to use a GlobalKTable to store data from a Kafka topic during the startup of my application. Both the GlobalKTable and KStream consume the same topic, I need to access headers in the GlobalKTable for processing. After the initial data load, my KStream will handle real-time records. these realtime record may be new record or exist in GlobalKTable. The challenge is that I can't access the Kafka headers in the GlobalKTable configuration. Here's a summary of my setup:

Two configuration classes: one for GlobalKTable and one for KStream. I need to initialize KStream after GlobalKTable is fully loaded. Here’s the GlobalKTableConfig class where I want to access the headers:

@Bean(name = "createGlobalKTable")
public GlobalKTable<String, GenericRecord> createGlobalKTable(@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {
    StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();
    Map<String, String> serdeConfig = new HashMap<>();
    serdeConfig.put("schema.registry.url", schemaRegistryUrl);
    serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
    serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);

    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    valueGenericAvroSerde.configure(serdeConfig, false);

    return streamsBuilder.globalTable(ediLegTopic,
         Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as(storeName + "-global")
             .withKeySerde(Serdes.String())
             .withValueSerde(valueGenericAvroSerde));

}

an this is KstreamConfig Class:

    @Bean
    public StreamsBuilderFactoryBean kStreamBuilder(
        @Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config,
        GlobalKTableService globalKTableService) {
        StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean(config, new CleanupConfig(false, false));
        factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
            kafkaStreams.setStateListener((newState, oldState) -> {
                log.info("State changed from " + oldState + " to " + newState);
                if (newState == KafkaStreams.State.RUNNING) {
                    KeyValueIterator<String, GenericRecord> keyValueIterator = globalKTableService.getAllValuesFromGlobalKTable();
                   // process each record regarding the logic
                }
            });
        });
        return factoryBean;
    }

    @Bean
    public KStream<String, GenericRecord> createKStream(
        @Qualifier("kStreamBuilder") StreamsBuilderFactoryBean kStreamsBuilderFactoryBean) throws Exception {
        StreamsBuilder kStreamBuilder = kStreamsBuilderFactoryBean.getObject();

        Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put("schema.registry.url", schemaRegistryUrl);
        serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
        serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);

        final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
        valueGenericAvroSerde.configure(serdeConfig, false);
        KStream<String, GenericRecord> kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));

        StoreBuilder<KeyValueStore<String, GenericRecord>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(storeName + "-global"),
            Serdes.String(),
            valueGenericAvroSerde
        );
        kStreamBuilder.addStateStore(storeBuilder);

        KStream<String, GenericRecord> processedStream = kStream.process(() -> new CustomProcessor(globalKTableService, logicProvider), storeName + "-global");
        return processedStream;
    }
}

I have a processor to apply the logic for each realtime record:


public class CustomProcessor implements Processor<String, GenericRecord, String, GenericRecord> {
    private ProcessorContext<String, GenericRecord> context;
    private final GlobalKTableService globalKTableService;
    private final LogicProvider logicProvider;

    public CustomProcessor(GlobalKTableService globalKTableService, LogicProvider logicProvider) {
        this.globalKTableService = globalKTableService;
        this.logicProvider= logicProvider;
    }
    @Override
    public void init(ProcessorContext<String, GenericRecord> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, GenericRecord> processingRecord) {
        String key = processingRecord.key();
        GenericRecord value = processingRecord.value();
        Headers headers = processingRecord.headers();

        GenericRecord globalTableValue = globalKTableService.getValueFromGlobalKTable(key);

        if ((!value.equals(globalTableValue))) {
            log.info("No match in GlobalKTable. Forwarding stream record with key: {}", key);
            logicProvider.applyLogic(new KeyValue<>(key, value), headers);
            context.forward(processingRecord.withValue(value));
        } else {
            log.info("Match found in GlobalKTable for key: {}. Skipping forward.", key);
        }
    }

    @Override
    public void close() {
        log.info("Closing CustomProcessor");
    }
}

How can I access the Kafka headers in the GlobalKTable configuration? This link Kafka Streams GlobalKTable and accessing the record headers has a related topic, but I cannot understand what does it mean.

Upvotes: 0

Views: 77

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

KTables and GlobalKTables do not store header information, but only key, value, and record timestamp.

To be able to access the headers, you would need to pre-process the data to copy the headers into the value. Such a pre-processing step implies that you read the original topic, and write the modified data back into a second topic, and populate the GlobalKTable from the second topic.

If using a second topic is a concern, you should be able to avoid it, if you are using Kafka Streams 3.8+. You can use addGlobalStore() (instead of globalTable() which is only syntactic sugar on top of addGlobalStore()) and implement a custom Processor that put() the data into the global store -- this allows you to add the headers on-the-fly before calling put()).

Upvotes: 0

Related Questions