Riat Abduramani
Riat Abduramani

Reputation: 63

Apache Kafka - Implementing a KTable and producing event using CloudEvent

I have an implementation related to KTable and using CloudEvents to produce events, but for some unknown reasons, the produced event from KTable is not formatted based on CloudEvent. The implementation is as below:

    public void initKafkaStream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        PojoCloudEventDataMapper<TicketEvent> ticketEventMapper = PojoCloudEventDataMapper.from(objectMapper, TicketEvent.class);
        KStream<String, CloudEvent> rawTicketStream = streamsBuilder.stream(rawTicketEvent, Consumed.with(Serdes.String(), cloudEventSerde));

        rawTicketStream
                .mapValues(e -> convertToPojo(e, TicketEventMapper))
                .filter((k, v) -> v != null)
                .groupByKey()
                .aggregate(
                        AggregatedTicketEvent::new,
                        (key, val, agg) -> doAggregation(agg, val),
                        Materialized
                                .<String, AggregatedTicketEvent, KeyValueStore<Bytes, byte[]>>as("aggregatedTicket")
                                .withValueSerde(aggregatedTicketEventSerde)
                                .withLoggingDisabled()
                )
                .mapValues(result -> {
                    try {
                        return CloudEventBuilder.v1()
                                .withId(UUID.randomUUID().toString())
                                .withType("ticket_update")
                                .withSource(sourceTemplate.expand(result.getCurrent().getId()))
                                .withTime(result.getMeta().getOccurredAt())
                                .withData(objectMapper.writeValueAsBytes(result))
                                .withDataContentType("application/json")
                                .build();
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .toStream()
                .to(aggregatedTicketEvent, Produced.with(Serdes.String(), cloudEventSerde));

        streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
        streams.setUncaughtExceptionHandler(ex -> StreamThreadExceptionResponse.REPLACE_THREAD);

        streams.start();
    }

Has anyone had such an issue?

Thanks in advance

Upvotes: 0

Views: 388

Answers (1)

Riat Abduramani
Riat Abduramani

Reputation: 63

The issue was that the configuration in props has been overwritten from Kafka streams in the serializer/deserializer, and by default sets the format to Encoding.BINARY. When the encoding is in Binary then the CloudEvents format is present only in the header instead of in the payload. To make sure that the serializers have the correct configuration I added them in the CloudEventSerializer and CloudEventDeserializer. In this case, the Serdes.serdeFrom() will be like the below:

Map<String, Object> ceSerializerConfigs = new HashMap<>();
        ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
        ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
        serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
        deserializer.configure(ceSerializerConfigs, false);

this.cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);

In order to get cloud events format in JSON payload we have to use Encoding.STRUCTURED with JSON content type, this will do the magic and have the results in the payload.

Hope this will help someone who's struggling with this issue!

Best,

Upvotes: 2

Related Questions