tintin
tintin

Reputation: 5867

KTable not deduplicating the incoming records with same keys

I am trying to deduplicate the records using input topic as KTable and sinking them to output topic. But the KTable is still sinking the duplicate records to the output topic. Not sure where am I going wrong.

Here is my application.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          process-in-0: input.topic
          process-out-0: output.topic
        definition: process
      kafka:
        streams:
          bindings:
            process-in-0:
              consumer:
                materializedAs: incoming-store
          binder:
            application-id: spring-cloud-uppercase-app
            brokers: localhost:9092
            configuration:
              commit:
                interval:
                  ms: 1000
                state.dir: state-store
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde

As per the spring cloud stream kafka stream documentation about state store, I have added the materialized view above as incoming-store

The process() bean function takes the input topic as KTable and and sink it to output topic


    @Bean
    public Function<KTable<String, String>, KStream<String, String>> process(){
        return table -> table
                .toStream()
                .peek((k, v) -> log.info("Received key={}, value={}", k, v));
    }

For a given input of 4 records

key=111, value="a"
key=111, value="a"
key=222, value="b"
key=111, value="a"

I am expecting to get only 2 records

key=111, value="a"
key=222, value="b"

But getting all the 4 records. Any help would be really appreciated!

Upvotes: 1

Views: 1197

Answers (2)

Felipe
Felipe

Reputation: 7583

You can group by a key and aggregate the events. Although you are not concatenating the strings during the aggregation process, the aggregate transformation will be used just to emit the values that you are grouping by the keys 111 or 222. Your use case is just a distinct aggregation. Every time that you aggregate you will receive (key, value, aggregate), then you keep only the value that it will be the latest value.

@Slf4j
@Configuration
@EnableAutoConfiguration
public class KafkaAggFunctionalService {

    @Bean
    public Function<KTable<String, String>, KStream<String, String>> aggregate() {
        return table -> table
                .toStream()
                .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
                .aggregate(() -> "", (key, value, aggregate) ->
                                value,
                        Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
                )
                .toStream()
                .peek((k, v) -> log.info("Received key={}, value={}", k, v));
    }
}

This git repo has a lot of examples. The one that looks very similar to yours is this.

Upvotes: 2

Arpit Saxena
Arpit Saxena

Reputation: 56

I think the problem that you are trying to solve, will be well solved by compacted topic here. Once you deliver data with the same key to a compacted topic and compaction is enabled on broker level (which is enabled by default), each broker will start a compaction manager thread and a number of compaction threads. These are responsible for performing the compaction tasks. Compaction does nothing but keeps the latest values of each key and cleans up the older (dirty) entries.

Refer this Kafka Documentation for more details.

Upvotes: 0

Related Questions