Reputation: 195
I'm using flink to read data from kafka and convert it to protobuf. The problem I'm facing is when I run the java application I get the below error. If I modify the unknownFields
variable name to something else, it works but it's hard to make this change on all protobuf classes.
I also tried to deserialize directly when reading from kafka but I'm not sure what should be the TypeInformation
to be returned for getProducedType()
method.
public static class ProtoDeserializer implements DeserializationSchema{
@Override
public TypeInformation getProducedType() {
// TODO Auto-generated method stub
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
}
Appreciate all the help. Thanks.
java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)
Code:
FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);
DataStream<byte[]> input = env.addSource(kafkaConsumer);
DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
@Override
public PBAddress map(byte[] value) throws Exception {
PBAddress addr = PBAddress.parseFrom(value);
return addr;
}
});
Upvotes: 2
Views: 5671
Reputation: 43612
https://issues.apache.org/jira/browse/FLINK-11333 is the JIRA ticket tracking the issue of implementing first-class support for Protobuf types with evolvable schema. You'll see that there was a pull request quite some time ago, which hasn't been merged. I believe the problem was that there is no support there for handling state migration in cases where Protobuf was previously being used by registering it with Kryo.
Meanwhile, the Stateful Functions project (statefun is a new API that runs on top of Flink) is based entirely on Protobuf, and it includes support for using Protobuf with Flink: https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf. (The entry point to that package is ProtobufTypeInformation.java.) I suggest exploring this package (which includes nothing statefun specific); however, it doesn't concern itself with migrations from Kryo either.
Upvotes: 1
Reputation: 11
Maybe you should try this follow:
env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,ProtobufSerializer.class);
or
env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,PBAddressSerializer.class);
public class PBAddressSerializer extends Serializer<Message> {
final private Map<Class,Method> hashMap = new HashMap<Class, Method>();
protected Method getParse(Class cls) throws NoSuchMethodException {
Method method = hashMap.get(cls);
if (method == null) {
method = cls.getMethod("parseFrom",new Class[]{byte[].class});
hashMap.put(cls,method);
}
return method;
}
@Override
public void write(Kryo kryo, Output output, Message message) {
byte[] ser = message.toByteArray();
output.writeInt(ser.length,true);
output.writeBytes(ser);
}
@Override
public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
try {
int size = input.readInt(true);
byte[] barr = new byte[size];
input.read(barr);
return (Message) getParse(pbClass).invoke(null,barr);
} catch (Exception e) {
throw new RuntimeException("Could not create " + pbClass, e);
}
}
}
Upvotes: 1
Reputation: 1
try this:
public class ProtoDeserializer implements DeserializationSchema<PBAddress> {
@Override
public TypeInformation<PBAddress> getProducedType() {
return TypeInformation.of(PBAddress.class);
}
Upvotes: 0