Amandeep Singh
Amandeep Singh

Reputation: 65

Kafka Ktable also streaming duplicate updates

Kafka Ktable also streaming duplicate updates.

I want to process the Ktable(created with Kstream.reduce()) changelog stream, i.e any change in value of the keys in the Ktable. But its seems even when the same key value pair is sent multiple times to Ktable, it is sent downstream every time. I need to send update in the value for a key only if the value changes.

`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

Upvotes: 2

Views: 1945

Answers (2)

fsarradin
fsarradin

Reputation: 193

Kafka Streams provides 2 semantics : emit-on-update and emit-on-window-close.

KIP-557 is about adding emit-on-change semantic based on byte array comparison of data. It has been implemented in Kafka Streams 2.6 and then removed due to "potential data loss".

Nevertheless, I have developed an implementation of the emit-on-change semantic, by using the Kafka Streams DSL.

The idea is to convert a KStream with emit-on-update semantic to a KStream with emit-on-change semantic. You can use this implementation on the source Kstream that you provide to create the KTable, or on the KTable after applying .toStream().

This implementation implicitly creates a state store, where the value contains the KStream data and a flag, that indicates if an update should be emitted. This flag is set in the aggregate operation and is based on Object#equals for comparison. But you could change the implementation to use a Comparator.

Here is the withEmitOnChange method that change the semantic of a KStream. You might have to specify a serde for EmitOnChangeState data structure (see below).

public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
    return streams
            .groupByKey()
            .aggregate(
                    () -> (EmitOnChangeState<V>) null,
                    (k, data, state) -> {
                        if (state == null) {
                            return new EmitOnChangeState<>(data, true);
                        } else {
                            return state.merge(data);
                        }
                    }
            )
            .toStream()
            .filter((k, state) -> state.shouldEmit)
            .mapValues(state -> (V) state.data);
}

Here is the data structure that is stored in state store and used to check if an update should be emitted.

public static class EmitOnChangeState<T> {
    public final T data;
    public final boolean shouldEmit;
    public EmitOnChangeState(T data, boolean shouldEmit) {
        this.data = data;
        this.shouldEmit = shouldEmit;
    }
    public EmitOnChangeState<T> merge(T newData) {
        return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
        return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
    }
    @Override
    public int hashCode() {
        return Objects.hash(data, shouldEmit);
    }
}

Usage:

KStream<ProductKey, Product> products = builder.stream("product-topic");

withEmitOnChange(products)
  .to("out-product-topic"); // output topic with emit-on-change semantic

Upvotes: 2

Tuyen Luong
Tuyen Luong

Reputation: 1366

It's default behavior of KTable#toStream(), it convert the changelog topic to a KStream, so the downstream operator of reduce get updated each time the upstream reduce operator receive a message.

You can archive your desire behavior using Processor API, in this case we use a KStream.transfomerValues().

First register a KeyValueStore to store your latest value:

//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

Then we create an ExtractIfValueChangedTransformer, only return value of new message if the value has changed, if not then return null:

public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}

Upvotes: 2

Related Questions