Raúl García
Raúl García

Reputation: 344

Flink Pojo serializer showing a lot of CPU time spent in KryoException<init>

I used to have a POJO being sent from one vertex to another using a HASH connector, and never noticed anything weird in the Flame Graphs view.

I have now added an AsyncIO step that will do the keyBy operation after some external operations and now the Flame Graphs show a large amount of time spent in some sort of serialization exception that I don't understand.

The POJO is exactly the same as it used to be. The only "change" is that previously there were operations that took much longer than the serialization, and now since most of the time we don't need any external lookup this step becomes some sort of passthrough operation.

It has getters and setters for all the fields, empty constructor, and even an explicit TypeInformation class field:

public class MyObject {
    public static final TypeInformation< MyObject > TYPE_INFORMATION = Types.POJO(MyObject.class,
ImmutableMap.of("field1", Types.STRING,
"protobufField2",TypeInformation.of(ProtoClass1OuterClass.ProtoClass1.class),
"protobufField3",TypeInformation.of(ProtoClass1OuterClass.ProtoClass2.class),
"pojo2", Types.POJO(Pojo2.class)));

    private String field1;
    private ProtoClass1OuterClass.ProtoClass1 protobufField2;
    private ProtoClass1OuterClass.ProtoClass2 protobufField3;
    private Pojo2 pojo2;

    public MyObject() {
    }

    public String getField1() {
        return field1;
    }

    public void setField1(String field1) {
        this.field1 = field1;
    }

    public ProtoClass1OuterClass.ProtoClass1 getProtobufField2() {
        return protobufField2;
    }

    public void setProtobufField2(ProtoClass1OuterClass.ProtoClass1 protobufField2) {
        this.protobufField2 = protobufField2;
    }

    public ProtoClass1OuterClass.ProtoClass2 getProtobufField3() {
        return protobufField3;
    }

    public void setProtobufField3(ProtoClass1OuterClass.ProtoClass2 protobufField3) {
        this.protobufField3 = protobufField3;
    }

    public Pojo2 getPojo2() {
        return pojo2;
    }

    public void setPojo2(Pojo2 pojo2) {
        this.pojo2 = pojo2;
    }
}

Then Pojo2 is a class like the following:

public abstract class Pojo2 implements Serializable {

    byte[] data;

    public Pojo2() {
        // POJO
    }

    protected Pojo2(byte[] data) {
        this.data = data;
    }

    public abstract String function(String param);

    // POJO
    public byte[] getData() {
        return data;
    }

    // POJO
    public void setData(byte[] data) {
        this.data = data;
    }
    // More functions
}

All the types and subtypes are registered as well:

config.registerPojoType(Pojo2.class);
config.registerPojoType(Pojo2Child.class);
config.registerTypeWithKryoSerializer(ProtoClass1OuterClass.class, ProtobufSerializer.class);
config.registerTypeWithKryoSerializer(ProtoClass1.class, ProtobufSerializer.class);
config.registerTypeWithKryoSerializer(ProtoClass2.class, ProtobufSerializer.class);

Flame Graphs Screenshot: Flame Graphs Screenshot

Upvotes: 2

Views: 117

Answers (0)

Related Questions