Reputation: 5867
I am trying to deduplicate the records using input topic as KTable and sinking them to output topic. But the KTable is still sinking the duplicate records to the output topic. Not sure where am I going wrong.
Here is my application.yml
spring:
cloud:
stream:
function:
bindings:
process-in-0: input.topic
process-out-0: output.topic
definition: process
kafka:
streams:
bindings:
process-in-0:
consumer:
materializedAs: incoming-store
binder:
application-id: spring-cloud-uppercase-app
brokers: localhost:9092
configuration:
commit:
interval:
ms: 1000
state.dir: state-store
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
As per the spring cloud stream kafka stream documentation about state store, I have added the materialized view above as incoming-store
The process() bean function takes the input topic as KTable and and sink it to output topic
@Bean
public Function<KTable<String, String>, KStream<String, String>> process(){
return table -> table
.toStream()
.peek((k, v) -> log.info("Received key={}, value={}", k, v));
}
For a given input of 4 records
key=111, value="a"
key=111, value="a"
key=222, value="b"
key=111, value="a"
I am expecting to get only 2 records
key=111, value="a"
key=222, value="b"
But getting all the 4 records. Any help would be really appreciated!
Upvotes: 1
Views: 1197
Reputation: 7583
You can group by a key and aggregate the events. Although you are not concatenating the strings during the aggregation process, the aggregate
transformation will be used just to emit the values that you are grouping by the keys 111
or 222
. Your use case is just a distinct aggregation. Every time that you aggregate you will receive (key, value, aggregate)
, then you keep only the value
that it will be the latest value.
@Slf4j
@Configuration
@EnableAutoConfiguration
public class KafkaAggFunctionalService {
@Bean
public Function<KTable<String, String>, KStream<String, String>> aggregate() {
return table -> table
.toStream()
.groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(() -> "", (key, value, aggregate) ->
value,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
)
.toStream()
.peek((k, v) -> log.info("Received key={}, value={}", k, v));
}
}
This git repo has a lot of examples. The one that looks very similar to yours is this.
Upvotes: 2
Reputation: 56
I think the problem that you are trying to solve, will be well solved by compacted topic here. Once you deliver data with the same key to a compacted topic and compaction is enabled on broker level (which is enabled by default), each broker will start a compaction manager thread and a number of compaction threads. These are responsible for performing the compaction tasks. Compaction does nothing but keeps the latest values of each key and cleans up the older (dirty) entries.
Refer this Kafka Documentation for more details.
Upvotes: 0