Reputation: 183
I have issues concerning the table-api of Flink (1.13+). I have a POJO containing several fields, one of them being:
List<String> my_list;
I create my table using the following declaration for this field:
"CREATE TABLE my_table (
...
my_list ARRAY<STRING>,
...
)"
When I try to convert my table to DataStream using the toRetractStream[MY_POJO] method after, I got the following error:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for unregistered table do not match.
Cause: Incompatible types for sink column 'my_list' at position 11. Query schema: [..., my_list: ARRAY, ...] Sink schema: [..., my_list: RAW('java.util.List', ?), ...]
I would like to avoid mapping every fields by hand and keep the code clean, is there a solution to handle this kind of data types ?
Upvotes: 7
Views: 2402
Reputation: 84
I have similar example that can enhance the question.
Let's say we use POJOs with such structures:
class Feature {
private String name;
private String value;
// getters setters
}
class Entity {
private String name;
private String value;
// getters setters
}
class FeatureEntityValue {
private List<Feature> features;
private List<Entity> entities;
private long event_time;
private String version;
// getters setters
}
And have a dataStream
DataStream<FeatureValue> features = ... ;
So once I want to convert this stream into a table
var table = tableEnv.fromDataStream(features);
table.printSchema();
schema will be:
(
`features` RAW('java.util.List', '...'),
`entities` RAW('java.util.List', '...'),
`event_time` BIGINT NOT NULL,
`version` STRING
)
which is barely can be manipulated on. For instance I'd like to flatten this table and apply a sql query:
tableEnv.createTemporaryView("result_table", table);
var flattenedTable = tableEnv.sqlQuery(
"SELECT * FROM result_table" +
"CROSS JOIN UNNEST(features) AS FeatureTable (f_name, f_value)" +
"CROSS JOIN UNNEST(entities) AS EntityTable (e_name, e_value)");
Then you'll see this error
Cannot apply 'UNNEST' to arguments of type 'UNNEST(<RAW('JAVA.UTIL.LIST', '...')>)'. Supported form(s): 'UNNEST(<MULTISET>)'
Alright let's set conversion schema explicitly
var schema = Schema.newBuilder()
.column("version", DataTypes.STRING())
.column("event_time", DataTypes.BIGINT())
.column(
"entities", DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
)
)
)
.column(
"features", DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
)
)
)
.build();
tableEnv.fromDataStream(features, schema);
will show us then
Cannot cast "java.util.List" to "org.apache.flink.types.Row[]"
In case if I change DataTypes.ARRAY
to DataTypes.MULTISET
there will be an error
class java.util.ArrayList cannot be cast to class java.util.Map
Alright, we have pojo definition maybe we can reuse it
DataTypes.ARRAY(
DataTypes.STRUCTURED(
Entity.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
)
)
Nope, there is a similar error
Cannot cast "java.util.List" to "Entity[]"
What I have came up with is converting into a flink tuple and only then register as a table with explicit transformation of the list into array
var ti = new TupleTypeInfo<Tuple4<String, Long, Feature[], Entity[]>>(
TypeInformation.of(String.class),
TypeInformation.of(Long.class),
ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Feature.class)),
ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Entity.class))
);
var converted = features.map((event) ->
Tuple4.of(
event.getVersion(),
event.getEventTime(),
event.getFeatures().toArray(new Feature[] {}),
event.getEntities().toArray(new Entity[] {})
)
).returns(ti);
var resultTable = tableEnv.fromDataStream(converted)
.as("version", "event_time", "features", "entities");
And only then I can apply flattening via sql. I don't see any more convenient way yet, I might miss something.
Upvotes: 1
Reputation: 2664
I would recommend to try out the new API methods from/toDataStream
and from/toChangelogStream
. Those support all kinds of classes and data types.
toDataStream
supports also mapping to POJOs with List
members.
Upvotes: 0