Jay
Jay

Reputation: 1

Thrift Serialization and Deserialization in Kafka Streaming

I am using thrift just to do serialization and deserialization for performance improvement while streaming byte data from kafka

When I deserialize, I keep getting this error:

org.apache.thrift.protocol.TProtocolException: Unrecognized type 123

My Code

public void streamMessageByte() {   
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<Integer, byte[]> stream = builder.stream(kafka_topic);
    deserializer = new TDeserializer();
    serializer = new TSerializer();
    //Thrift class pojo object is 'deser' which matches byte array data format
    stream.map((k,v){

        try{
            deserializer.deserialize(deser, v);
        }

        catch(TException e){

        }
    null;
});
   

Upvotes: 0

Views: 2379

Answers (1)

Alex P.
Alex P.

Reputation: 3171

I came across this problem, when I was using different protocols to serialize and deserialize.

The serializer was usind ObjectMapper and the deserializer was using TDeserializer with TBinaryProtocol. Example:

@Test
public void testSerDe() throws TException, JsonProcessingException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   JsonSerializer serializer = new JsonSerializer();
   ObjectMapper mapper = new ObjectMapper();
   byte[] serialized = mapper.writeValueAsString(person).getBytes();
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   Person desPerson = new Person();
   deserializer.deserialize(desPerson, serialized);
   assertEquals(person, desPerson);
}

That would throw org.apache.thrift.protocol.TProtocolException: Unrecognized type 123

It should work if you serialize and deserialize the same way. Here is an example:

@Test
public void testSerDe() throws TException {
   // Given
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   TSerializer serializer = new TSerializer(TBinaryProtocol::new);
   // When
   byte[] serializedPerson = serializer.serialize(person);
   // Then
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   GraphEvent dePerson = new Person();
   deserializer.deserialize(dePerson, serializedPerson);
   assertEquals(person, dePerson);
}

Upvotes: 1

Related Questions