rishi007bansod
rishi007bansod

Reputation: 1479

Error While Deserializing object from Kafka Streams

At kafka streams side while writing to Topic, I have serialized Window Key [test_id@timestamp1/timestamp2] using Serdes.String() serializer from kafka. While retrieving same key from another application I am getting following error while deserialization

com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
 at [Source: [B@37e7c0b2; line: 1, column: 1]
        at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
        at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
        at TestAlert$3.extract(TestAlert.java:483)
        at TestAlert$3.extract(TestAlert.java:1)
        at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
        at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
        at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Following is the code I have written for serializing Windowed Key. Here testWinAlerts is the aggregate result after windowing which has <Windowd<String>> as key

testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
                return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");

Below is the code at deserializer for converting bytes[] to key in String format again. Where msg.key()[specific to Ignite] provides key in byte format after consumption from topic.

String windowKey = objectMapper.readValue(msg.key(), String.class); 

On further test I have also tried removing "@", "/", "[", "]" characters from Window String before writing it to kafka topic, then it worked. But in actual implementation, I dont want to add this extra overhead of removing these characters from String. So how can I remove this error?

Upvotes: 0

Views: 2230

Answers (1)

Dmitry Minkovsky
Dmitry Minkovsky

Reputation: 38203

You are serializing the input as a string with StringSerde, but then you are trying to deserialize it with Jackson, which is expecting a JSON string as its input. A regular string can be any series of characters. But a JSON string looks like "string"—by definition it begins and ends with ". So you can't deserialize any string with Jackson, in its serialized state it must begin and end with ". Why don't use just use the StringSerde to deserialize the key?

Upvotes: 3

Related Questions