Reputation: 2878
Seeking advice on below.
We are conducting a PoC and are in the process of evaluating Flink and Doris integration using the below versions and dependencies.
Doris is up and running successfully (set up by following Quick Start from official Doris documentation). Able to access FE using - http://10.0.2.15:8030
Below is the PoC code snippet (Java maven project) being used to read the CSV file as Flink FileSystem source and insert it into Doris table as a sink
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkDorisIntegration {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Flink FileSystem CSV format source table in default in-memory catalog
tableEnv.executeSql("create table IF NOT EXISTS `default_catalog`.`default_database`.`fs_src`(station string, txdate string, txtime string, txseq string)" +
" with (\n" +
"'connector' = 'filesystem',\n" +
"'path' = 'file:///home/user/Documents/app_data/source/',\n" +
"'format' = 'csv',\n" +
"'csv.ignore-parse-errors' = 'true',\n" +
"'csv.allow-comments' = 'true',\n" +
"'source.monitor-interval' = '1s'\n" +
");");
// Create Doris Catalog (Not able to execute this, throwing an exception as illegalargumentexception )
tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'base-url' = 'jdbc:mysql://10.0.2.15:9030')");
// Switch to Doris Catalog
tableEnv.executeSql("USE CATALOG demo_catalog;");
//Create Doris Sink table
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, txseq varchar) " +
" with (\n" +
"'connector' = 'doris',\n" +
"'fenodes' = '10.0.2.15:8030',\n" +
"'table.identifier' = 'db_test.flink_doris_sink',\n" +
"'username' = 'root',\n" +
"'password' = '',\n" +
"'sink.label-prefix' = 'doris_label'\n" +
");");
// Insert into Doris table
tableEnv.executeSql("INSERT INTO demo_catalog.db_test.flink_doris_sink " +
" (station, txdate, txtime, txseq)" +
" SELECT " +
" station, txdate, txtime, txseq " +
" FROM `default_catalog`.`default_database`.fs_src; ");
env.execute();
}
}
To access Doris table, I need to switch to Doris catalog but getting illegalargumentexception
exception while trying to create Doris catalog from Flink. Running code from Intellij Idea and not as a job by submitting a jar to Flink cluster.
Can someone please help with the below:
Upvotes: 0
Views: 171