Reputation: 315
I have a flink table let's say CREATE TABLE source(id int, name string) with (...)
and a destination table let's say CREATE TABLE destination(id int, unique_name string) with (...)
. unique_name
is calculated using business logic in internal flink process function.
So We can safely assume that the source schema will be exact(name and data types) same as the destination schema.
I did some low level process
using datastream API to get destination
datastream. It has outputType
as GenericType<org.apache.flink.types.Row>
. When I convert back destination
datastream to table again, I got the below error.
org.apache.flink.table.api.ValidationException: Column types of query result and sink
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.
Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema: [id: INT, name: STRING]
Although I'm able to resolve this issue using below code, however I want to generify this and get RowTypeInformation
from destination Table
. Is there any way to get TypeInformation
from flink Table
.
tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
Upvotes: 2
Views: 2858
Reputation: 2664
The table type system is richer than TypeInformation
. If you are ok with using internal classes, you can use org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
. It is TypeInformation
that can be configured using Table API's DataType
.
If you like to use officially supported API. You can declare the in and out type with TypeInformation
and use DataTypes.of(TypeInformation)
when calling StreamTableEnvironment.toDataStream(..., DataType)
Upvotes: 3