gaurav miglani
gaurav miglani

Reputation: 315

Flink table get type information

I have a flink table let's say CREATE TABLE source(id int, name string) with (...) and a destination table let's say CREATE TABLE destination(id int, unique_name string) with (...). unique_name is calculated using business logic in internal flink process function.

So We can safely assume that the source schema will be exact(name and data types) same as the destination schema. I did some low level process using datastream API to get destination datastream. It has outputType as GenericType<org.apache.flink.types.Row>. When I convert back destination datastream to table again, I got the below error.

org.apache.flink.table.api.ValidationException: Column types of query result and sink 
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.

Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema:  [id: INT, name: STRING]

Although I'm able to resolve this issue using below code, however I want to generify this and get RowTypeInformation from destination Table. Is there any way to get TypeInformation from flink Table.

tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))

Upvotes: 2

Views: 2858

Answers (1)

twalthr
twalthr

Reputation: 2664

The table type system is richer than TypeInformation. If you are ok with using internal classes, you can use org.apache.flink.table.runtime.typeutils.ExternalTypeInfo. It is TypeInformation that can be configured using Table API's DataType.

If you like to use officially supported API. You can declare the in and out type with TypeInformation and use DataTypes.of(TypeInformation) when calling StreamTableEnvironment.toDataStream(..., DataType)

Upvotes: 3

Related Questions