Sergey Postument
Sergey Postument

Reputation: 187

Flink create table via table DSL

In order to create table, I use an SQL syntax like

    val tableEnv = StreamTableEnvironment.create(env, settings)
    tableEnv.executeSql(
      "CREATE TABLE asset (smth STRING) " +
        "WITH ('connector' = 'jdbc', " +
        "'url' = 'jdbc:mysql://host:3306/db', " +
        "'username' = 'user', " +
        "'password' = 'pass', " +
        "'table-name' = 'table')"
    ) 

Is there an option to define a table via Table API DSL?

Upvotes: 0

Views: 1640

Answers (2)

Francesco Guardiani
Francesco Guardiani

Reputation: 688

You can create the very same table with the following Table API methods:

    Schema schema =
            Schema.newBuilder()
                    .column("smth", DataTypes.STRING())
                    .build();
    TableDescriptor tableDescriptor =
            TableDescriptor.forConnector("jdbc")
                    .option(JdbcConnectorOptions.URL, "jdbc:mysql://host:3306/db")
                    .option(JdbcConnectorOptions.USERNAME, "user")
                    .option(JdbcConnectorOptions.PASSWORD, "pass")
                    .option(JdbcConnectorOptions.TABLE_NAME, "table")
                    .schema(schema)
                    .build();
    tEnv.createTable("asset", tableDescriptor);

To create a temporary table instead, use tEnv.createTemporaryTable.

Check TableDescriptor and Schema for more details.

Upvotes: 3

David Anderson
David Anderson

Reputation: 43707

What you can do is remove these table definitions from your code and store them in a persistent catalog. But for creating temporary tables in the in-memory catalog, what you're doing is how it's done.

Upvotes: 1

Related Questions