Neoster
Neoster

Reputation: 195

Using protobuf with flink

I'm using flink to read data from kafka and convert it to protobuf. The problem I'm facing is when I run the java application I get the below error. If I modify the unknownFields variable name to something else, it works but it's hard to make this change on all protobuf classes.

I also tried to deserialize directly when reading from kafka but I'm not sure what should be the TypeInformation to be returned for getProducedType() method.

    public static class ProtoDeserializer implements DeserializationSchema{

    @Override
    public TypeInformation getProducedType() {
        // TODO Auto-generated method stub
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }

Appreciate all the help. Thanks.

java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

Code:

    FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);

    DataStream<byte[]> input = env.addSource(kafkaConsumer);
    DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
        @Override
        public PBAddress map(byte[] value) throws Exception {
            PBAddress addr = PBAddress.parseFrom(value);
            return addr;
        }
    });

Upvotes: 2

Views: 5671

Answers (3)

David Anderson
David Anderson

Reputation: 43612

https://issues.apache.org/jira/browse/FLINK-11333 is the JIRA ticket tracking the issue of implementing first-class support for Protobuf types with evolvable schema. You'll see that there was a pull request quite some time ago, which hasn't been merged. I believe the problem was that there is no support there for handling state migration in cases where Protobuf was previously being used by registering it with Kryo.

Meanwhile, the Stateful Functions project (statefun is a new API that runs on top of Flink) is based entirely on Protobuf, and it includes support for using Protobuf with Flink: https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf. (The entry point to that package is ProtobufTypeInformation.java.) I suggest exploring this package (which includes nothing statefun specific); however, it doesn't concern itself with migrations from Kryo either.

Upvotes: 1

湘伦Rong
湘伦Rong

Reputation: 11

Maybe you should try this follow:

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,ProtobufSerializer.class);

or

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,PBAddressSerializer.class);

public class PBAddressSerializer extends Serializer<Message> {
  final private Map<Class,Method> hashMap = new HashMap<Class, Method>();

  protected Method getParse(Class cls) throws NoSuchMethodException {
    Method method = hashMap.get(cls);
    if (method == null) {
      method = cls.getMethod("parseFrom",new Class[]{byte[].class});
      hashMap.put(cls,method);
    }

    return method;
  }

  @Override
  public void write(Kryo kryo, Output output, Message message) {
    byte[] ser = message.toByteArray();
    output.writeInt(ser.length,true);
    output.writeBytes(ser);

  }

  @Override
  public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
    try {
      int size = input.readInt(true);
      byte[] barr = new byte[size];
      input.read(barr);
      return (Message) getParse(pbClass).invoke(null,barr);
    } catch (Exception e) {
      throw new RuntimeException("Could not create " + pbClass, e);
    }

  } 
}

Upvotes: 1

Stephen Jason
Stephen Jason

Reputation: 1

try this:

public class ProtoDeserializer implements DeserializationSchema<PBAddress> {
    @Override
    public TypeInformation<PBAddress> getProducedType() {
        return TypeInformation.of(PBAddress.class);
    }

Upvotes: 0

Related Questions