
Reputation: 25770

Spring Kafka and Kafka Streams

In Spring Boot application I'm trying to configure Kafka Streams. With plain Kafka topics, everything is working fine, but I unable to get working Spring Kafka Streams.

This is my configuration:

public class KafkaStreamsConfig {

    private String bootstrapServers;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);

    public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {

        KStream<String, String> stream ="post.sent");

        stream.mapValues(post -> post.toString()).to("streamingTopic2");


        return stream;

    public NewTopic kafkaTopicTest() {
        return new NewTopic("streamingTopic2", 1, (short) 1);

    @KafkaListener(topics = "streamingTopic2", containerFactory = "kafkaListenerContainerFactory")
    public void testListener(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {

        String value = consumerRecord.value();

        System.out.println("VALUE: " + value);



I want to create a stream based on post.sent topic. To apply a simple transformation and to send the messages from this stream to test streamingTopic2 topic.

Right now when I send the message into post.sent topic I unable immediately to get it in "streamingTopic2" but after my application restart it start fails with the following error:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition streamingTopic2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 53, 98, 56, 49, 53, 99, 97, 51, 52, 102, 97, 101, 102, 48, 52, 55, 97, 52, 48, 48, 100, 52, 50, 97, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, 83, 69, 78, 84, 34, 44, 34, 101, 120, 116, 101, 114, 110, 97, 108, 80, 111, 115, 116, 73, 100, 34, 58, 34, 48, 53, 54, 97, 57, 51, 49, 101, 45, 56, 97, 53, 100, 45, 52, 100, 52, 52, 45, 97, 101, 50, 48, 45, 53, 99, 51, 53, 52, 56, 57, 52, 98, 97, 53, 49, 34, 44, 34, 99, 104, 97, 116, 78]] from topic [streamingTopic2]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
 at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose( ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader.readValue( ~[jackson-databind-2.9.6.jar:2.9.6]
    at ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord( ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300( ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords( ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400( ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clien

To post.sent I send the following messages <String, Post> where the Post is my own complex type but I don't know right now how to translate it to <String, String> in kStream() in order to be able to consume it in testListener().

Please suggest how to make it work.

Upvotes: 4

Views: 4818

Answers (2)


Reputation: 125

To use Streams you need to do something like this:


    public void process(@Input("input") KTable<String,MyMessage> myMessages,
                        @Input("streammapping") KTable<String, StreamMapping> streamMessages) {

interface MyStreamProcessor {
KTable<?, ?> input();

KTable <?, ?> streamMapping();


and then put your processing code in the method body. KStreams work the same way

Upvotes: 2


Reputation: 191671

Regarding your usage of

return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class)); in order to define the consumerFactory bean

Well, I can't say how you have Produced data into the topic, but the JSON parser is failing.

Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
 at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]

Based on Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105 ..., I would say you have at some point done a byte[] producer, rather than explicitly defined using StringSerializer or JSONSerializer during production.

You could get around your error by using new StringDeserializer() or even do no conversion at all with ByteArrayDeserializer in your consumerFactory, but then you'll still need to handle how to later parse that event into a object that you want to manipulate and extract fields from.

Upvotes: 7

Related Questions