Alex Krauss
Alex Krauss

Reputation: 10393

Flink serialization of java.util.List and java.util.Map

My Flink pipeline currently uses an Pojo that contains some Lists and Maps (of Strings), along the lines of

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

For performance reasons, I want to get around Kryo serialization, so I disabled the generic fallback with env.getConfig().disableGenericTypes(); as described in the Flink documentation.

Now, Flink complains about the lists:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...

What is the preferred way of serializing such simple lists and maps in Flink?. Internally, these are currently ArrayList and HashMap, but other implementations would also be fine. There seems to be a class org.apache.flink.api.common.typeutils.base.ListSerializer in Flink, but I do not know how to use it.

Upvotes: 7

Views: 9375

Answers (3)

j123b567
j123b567

Reputation: 3439

It is possible to annotate any class with TypeInfo. It is necessary to annotate only classes that contain Map or List, so you don't have to manually write a complete hierarchy of deeply nested classes, and TypeInformation.of(MyPojo.class) just works.

It is a bit verbose though.

@TypeInfo(OtherPojoTypeInfoFactory.class)
public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

@TypeInfo(MyPojoTypeInfoFactory.class)
public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojoTypeInfoFactory extends TypeInfoFactory<OtherPojo> {
    @Override
    public TypeInformation<OtherPojo> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return Types.POJO(
                OtherPojo.class,
                Map.of(
                        "myMap", Types.MAP(Types.STRING, Types.STRING)
                )
        );
    }
}

public class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
    @Override
    public TypeInformation<MyPojo> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return Types.POJO(
                MyPojo.class,
                Map.of(
                        "myList", Types.LIST(Types.STRING),
                        "otherPojo", TypeInformation.of(OtherPojo.class)
                )
        );
    }
}

Upvotes: 0

Arvid Heise
Arvid Heise

Reputation: 3634

Marius already explained the reason beautifully, although I don't see the reason why Flink does not support your use case out of the box. Nevertheless, I'll add the solution that works right now.

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

Note that the terrible access pattern in the test code is just for a quick and dirty demonstration.

Upvotes: 10

Marius Jaraminas
Marius Jaraminas

Reputation: 881

If you do:

env.getConfig().disableGenericTypes();

It will raise an exception whenever a data type is encountered that would go through Kryo.

So in that case you have to write your own Serializer. Which can be created using TypeSerializer, simply call typeInfo.createSerializer(config) on the TypeInformation object.

For generic types, you need to “capture” the generic type information via the TypeHint, in your case for a list:

TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});

ListTypeInfo class

More details in here.

Upvotes: 8

Related Questions