Reputation: 1
I am using thrift just to do serialization and deserialization for performance improvement while streaming byte data from kafka
When I deserialize, I keep getting this error:
org.apache.thrift.protocol.TProtocolException: Unrecognized type 123
My Code
public void streamMessageByte() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> stream = builder.stream(kafka_topic);
deserializer = new TDeserializer();
serializer = new TSerializer();
//Thrift class pojo object is 'deser' which matches byte array data format
stream.map((k,v){
try{
deserializer.deserialize(deser, v);
}
catch(TException e){
}
null;
});
Upvotes: 0
Views: 2379
Reputation: 3171
I came across this problem, when I was using different protocols to serialize and deserialize.
The serializer was usind ObjectMapper
and the deserializer was using TDeserializer
with TBinaryProtocol
. Example:
@Test
public void testSerDe() throws TException, JsonProcessingException {
final Person person = new Person("Thomas", Byte.valueOf("23"));
JsonSerializer serializer = new JsonSerializer();
ObjectMapper mapper = new ObjectMapper();
byte[] serialized = mapper.writeValueAsString(person).getBytes();
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
Person desPerson = new Person();
deserializer.deserialize(desPerson, serialized);
assertEquals(person, desPerson);
}
That would throw org.apache.thrift.protocol.TProtocolException: Unrecognized type 123
It should work if you serialize and deserialize the same way. Here is an example:
@Test
public void testSerDe() throws TException {
// Given
final Person person = new Person("Thomas", Byte.valueOf("23"));
TSerializer serializer = new TSerializer(TBinaryProtocol::new);
// When
byte[] serializedPerson = serializer.serialize(person);
// Then
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
GraphEvent dePerson = new Person();
deserializer.deserialize(dePerson, serializedPerson);
assertEquals(person, dePerson);
}
Upvotes: 1