Reputation: 1197
I want to correctly delete object from KTable.
I have following stream processing data:
input
.map(this::deserialize)
.filter(((key, event) ->
Arrays.asList(
"AssessmentInitialized",
"AssessmentRenamed",
"AssessmentDeleted"
).contains(event.eventType())))
.map( ... do mapping ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
Assessment::new,
(key, event, assessment) -> assessment.handleEvent(event),
Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(assessmentSerde)
);
and assessment.handleEvent(event)
returns null
when it processes AssessmentDeleted
event. Serde is this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper);
where mapper is default com.fasterxml.jackson.databind.ObjectMapper bean.
After I event is processed by this stream I see in Kafka KTable changelog following value:
{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}
And it doesn't look like something I want to have. Is that way of deleting object correct? I expected that if I push null as value to aggregate operation it will be removed, but looks like some trash left and I'm not sure if its mapper problem or incorrect way of deletion or correct KTable behavior.
Upvotes: 0
Views: 3694
Reputation: 6593
In you case it seems, that an issue was in a checking tool. Because of some reason it deserialize null
value not properly.
Always it good to use Kafka tools to check it first (kafka-console-consumer.sh
, kafka-console-producer.sh
).
Upvotes: 1