Reputation: 1479
At kafka streams side while writing to Topic, I have serialized Window Key [test_id@timestamp1/timestamp2]
using Serdes.String()
serializer from kafka. While retrieving same key from another application I am getting following error while deserialization
com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token
at [Source: [B@37e7c0b2; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)
at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at TestAlert$3.extract(TestAlert.java:483)
at TestAlert$3.extract(TestAlert.java:1)
at org.apache.ignite.stream.StreamAdapter.addMessage(StreamAdapter.java:181)
at org.apache.ignite.stream.kafka.KafkaStreamer.access$100(KafkaStreamer.java:47)
at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Following is the code I have written for serializing Windowed Key. Here testWinAlerts
is the aggregate result after windowing which has <Windowd<String>>
as key
testWinAlerts.toStream((k,v)->k.toString()).filter((k,v)->{
return (v!=null);}).to(Serdes.String(),aggrMessageSerde,"Some-Topic");
Below is the code at deserializer for converting bytes[] to key in String format again. Where msg.key()[specific to Ignite] provides key in byte format after consumption from topic.
String windowKey = objectMapper.readValue(msg.key(), String.class);
On further test I have also tried removing "@", "/", "[", "]"
characters from Window String before writing it to kafka topic, then it worked. But in actual implementation, I dont want to add this extra overhead of removing these characters from String. So how can I remove this error?
Upvotes: 0
Views: 2230
Reputation: 38203
You are serializing the input as a string with StringSerde
, but then you are trying to deserialize it with Jackson, which is expecting a JSON string as its input. A regular string can be any series of characters. But a JSON string looks like "string"
—by definition it begins and ends with "
. So you can't deserialize any string with Jackson, in its serialized state it must begin and end with "
. Why don't use just use the StringSerde
to deserialize the key?
Upvotes: 3