Reputation: 6332
I am using Flink 1.12, and I am super confused with when table and dataset/datastream conversion can be performed. In the following code, I want to print the table content to the console, and I tried the following 3 ways ,all of them throws exception
I would ask how to print the table content to the console,eg, using the print
method
import org.apache.flink.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment, TableResult}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row
object Sql021_PlannerOldBatchTest {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val env = TableEnvironment.create(settings)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.DOUBLE())
env.connect(new FileSystem().path("D:/stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")
val table = env.sqlQuery("select * from sourceTable")
//ERROR: Only tables that originate from Scala DataSets can be converted to Scala DataSets.
// table.toDataSet[Row].print()
//ERROR:Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
table.toAppendStream[Row].print()
//ERROR: table doesn't has the print method
// table.print()
}
}
Upvotes: 0
Views: 444
Reputation: 43454
In the streaming case, this will work
tenv.toAppendStream(table, classOf[Row]).print()
env.execute()
and the batch case you can do
val tableResult: TableResult = env.executeSql("select * from sourceTable")
tableResult.print()
Upvotes: 1