Reputation: 93
I´m trying to use Flink Table API in scala. No errors in compiling time, but qhen i´m running the job in my flink cluster: flink.api.table.TableException: Type is not supported:<GenericType<java.lang.Object>
My maven dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
My imports:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.{Row, Table, TableEnvironment}
My code:
// odo[(Long,String,Double,Long)]
val inputTable = odo.toTable(tableEnv,'ts,'ty, 'vl, 'dv)
val resultStream: Table = inputTable.where('ty === "Odometer").select('dv)
resultStream.toDataStream[Row].print
Update: I think it could be about Flink version(1.0.3), because when i do something like that:
val inputTable = odo.toTable(tableEnv, 'ts, 'ty, 'vl, 'dv)
val result = inputTable.select('dv,'vl.sum).where('dv == 111)
result.toDataStream[Row].print()
I have another exception: org.apache.flink.api.table.TableException: Aggregate on stream tables is currently not supported.
Any help is appreciated. Thank you.
Upvotes: 1
Views: 2015
Reputation: 2644
Flink's Table API does not support fields that contain a GenericType
in 1.1-SNAPSHOT
. There is a Pull Request which implements this feature. It is very likely that it will be contained in the Flink 1.1 release.
Regarding your second exception: The exception is basically self explaining. You cannot do aggregations on streams so far. However, StreamSQL is under development.
Upvotes: 1