Fray
Fray

Reputation: 183

How to convert a Table to a DataStream containing array types (Flink)?

I have issues concerning the table-api of Flink (1.13+). I have a POJO containing several fields, one of them being:

List<String> my_list; 

I create my table using the following declaration for this field:

"CREATE TABLE my_table (
   ...
   my_list ARRAY<STRING>,
   ...
)"

When I try to convert my table to DataStream using the toRetractStream[MY_POJO] method after, I got the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for unregistered table do not match.

Cause: Incompatible types for sink column 'my_list' at position 11. Query schema: [..., my_list: ARRAY, ...] Sink schema: [..., my_list: RAW('java.util.List', ?), ...]

I would like to avoid mapping every fields by hand and keep the code clean, is there a solution to handle this kind of data types ?

Upvotes: 7

Views: 2402

Answers (2)

mrdinklage
mrdinklage

Reputation: 84

I have similar example that can enhance the question.

Let's say we use POJOs with such structures:

class Feature {
  private String name;
  private String value;
  // getters setters
}
class Entity {
  private String name;
  private String value;
  // getters setters
}
class FeatureEntityValue {
  private List<Feature> features;
  private List<Entity> entities;
  private long event_time;
  private String version;
  // getters setters
}

And have a dataStream

DataStream<FeatureValue> features = ... ;

So once I want to convert this stream into a table

var table = tableEnv.fromDataStream(features);
table.printSchema();

schema will be:

(
  `features` RAW('java.util.List', '...'),
  `entities` RAW('java.util.List', '...'),
  `event_time` BIGINT NOT NULL,
  `version` STRING
)

which is barely can be manipulated on. For instance I'd like to flatten this table and apply a sql query:

tableEnv.createTemporaryView("result_table", table);
var flattenedTable = tableEnv.sqlQuery(
"SELECT * FROM result_table" +
"CROSS JOIN UNNEST(features) AS FeatureTable (f_name, f_value)" +
"CROSS JOIN UNNEST(entities) AS EntityTable (e_name, e_value)");

Then you'll see this error

Cannot apply 'UNNEST' to arguments of type 'UNNEST(<RAW('JAVA.UTIL.LIST', '...')>)'. Supported form(s): 'UNNEST(<MULTISET>)'

Alright let's set conversion schema explicitly

var schema = Schema.newBuilder()
.column("version", DataTypes.STRING())
.column("event_time", DataTypes.BIGINT())
.column(
  "entities", DataTypes.ARRAY(
    DataTypes.ROW(
      DataTypes.FIELD("name", DataTypes.STRING()),
      DataTypes.FIELD("value", DataTypes.STRING())
    )
  )
)
.column(
  "features", DataTypes.ARRAY(
    DataTypes.ROW(
      DataTypes.FIELD("name", DataTypes.STRING()),
      DataTypes.FIELD("value", DataTypes.STRING())
    )
  )
)
.build();
tableEnv.fromDataStream(features, schema);

will show us then

Cannot cast "java.util.List" to "org.apache.flink.types.Row[]"

In case if I change DataTypes.ARRAY to DataTypes.MULTISET there will be an error

class java.util.ArrayList cannot be cast to class java.util.Map

Alright, we have pojo definition maybe we can reuse it

DataTypes.ARRAY(
  DataTypes.STRUCTURED(
    Entity.class,
    DataTypes.FIELD("name", DataTypes.STRING()),
    DataTypes.FIELD("value", DataTypes.STRING())
  )
)

Nope, there is a similar error

Cannot cast "java.util.List" to "Entity[]"

What I have came up with is converting into a flink tuple and only then register as a table with explicit transformation of the list into array

var ti = new TupleTypeInfo<Tuple4<String, Long, Feature[], Entity[]>>(
  TypeInformation.of(String.class),
  TypeInformation.of(Long.class),
  ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Feature.class)),
  ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(Entity.class))
);

var converted = features.map((event) ->
  Tuple4.of(
    event.getVersion(),
    event.getEventTime(),
    event.getFeatures().toArray(new Feature[] {}),
    event.getEntities().toArray(new Entity[] {})
  )
).returns(ti);

var resultTable = tableEnv.fromDataStream(converted)
  .as("version", "event_time", "features", "entities");

And only then I can apply flattening via sql. I don't see any more convenient way yet, I might miss something.

Upvotes: 1

twalthr
twalthr

Reputation: 2664

I would recommend to try out the new API methods from/toDataStream and from/toChangelogStream. Those support all kinds of classes and data types.

toDataStream supports also mapping to POJOs with List members.

Upvotes: 0

Related Questions