Reputation: 10393
My Flink pipeline currently uses an Pojo that contains some Lists and Maps (of Strings), along the lines of
public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}
For performance reasons, I want to get around Kryo serialization, so I disabled the generic fallback with env.getConfig().disableGenericTypes();
as described in the Flink documentation.
Now, Flink complains about the lists:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
...
What is the preferred way of serializing such simple lists and maps in Flink?. Internally, these are currently ArrayList
and HashMap
, but other implementations would also be fine. There seems to be a class org.apache.flink.api.common.typeutils.base.ListSerializer
in Flink, but I do not know how to use it.
Upvotes: 7
Views: 9375
Reputation: 3439
It is possible to annotate any class with TypeInfo
. It is necessary to annotate only classes that contain Map
or List
, so you don't have to manually write a complete hierarchy of deeply nested classes, and TypeInformation.of(MyPojo.class)
just works.
It is a bit verbose though.
@TypeInfo(OtherPojoTypeInfoFactory.class)
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}
@TypeInfo(MyPojoTypeInfoFactory.class)
public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojoTypeInfoFactory extends TypeInfoFactory<OtherPojo> {
@Override
public TypeInformation<OtherPojo> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return Types.POJO(
OtherPojo.class,
Map.of(
"myMap", Types.MAP(Types.STRING, Types.STRING)
)
);
}
}
public class MyPojoTypeInfoFactory extends TypeInfoFactory<MyPojo> {
@Override
public TypeInformation<MyPojo> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return Types.POJO(
MyPojo.class,
Map.of(
"myList", Types.LIST(Types.STRING),
"otherPojo", TypeInformation.of(OtherPojo.class)
)
);
}
}
Upvotes: 0
Reputation: 3634
Marius already explained the reason beautifully, although I don't see the reason why Flink does not support your use case out of the box. Nevertheless, I'll add the solution that works right now.
// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class,
ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));
// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");
final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);
DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));
Note that the terrible access pattern in the test code is just for a quick and dirty demonstration.
Upvotes: 10
Reputation: 881
If you do:
env.getConfig().disableGenericTypes();
It will raise an exception whenever a data type is encountered that would go through Kryo.
So in that case you have to write your own Serializer.
Which can be created using TypeSerializer
, simply call typeInfo.createSerializer(config)
on the TypeInformation object.
For generic types, you need to “capture” the generic type information via the TypeHint, in your case for a list:
TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});
More details in here.
Upvotes: 8