Mateusz
Mateusz

Reputation: 1197

correctly delete record from Kafka ktable

I want to correctly delete object from KTable.

I have following stream processing data:

input
.map(this::deserialize)
.filter(((key, event) ->
    Arrays.asList(
        "AssessmentInitialized",
        "AssessmentRenamed",
        "AssessmentDeleted"
    ).contains(event.eventType())))
.map( ... do mapping  ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
    Assessment::new,
    (key, event, assessment) -> assessment.handleEvent(event),
    Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(assessmentSerde)
);

and assessment.handleEvent(event) returns null when it processes AssessmentDeleted event. Serde is this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper); where mapper is default com.fasterxml.jackson.databind.ObjectMapper bean.

After I event is processed by this stream I see in Kafka KTable changelog following value:

{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}

And it doesn't look like something I want to have. Is that way of deleting object correct? I expected that if I push null as value to aggregate operation it will be removed, but looks like some trash left and I'm not sure if its mapper problem or incorrect way of deletion or correct KTable behavior.

Upvotes: 0

Views: 3694

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

In you case it seems, that an issue was in a checking tool. Because of some reason it deserialize null value not properly. Always it good to use Kafka tools to check it first (kafka-console-consumer.sh, kafka-console-producer.sh).

Upvotes: 1

Related Questions