superman
superman

Reputation: 1

How do I connect Apache Flink to TDengine database?

I want to connect Flink to TDengine database. I configured it successfully.

However, when I restart a job, it is said:

Native Library /usr/local/taos/driver/libtaos.so.2.0.8.2 already loaded in another classloader

Does anyone successfully connect Flink with TDengine? Would you like to share some experience, thank you.

Upvotes: 0

Views: 105

Answers (1)

Xiao Ping
Xiao Ping

Reputation: 301

here is the sample code from this post: https://www.taosdata.com/blog/2022/05/30/9165.html

package com.taosdata.java;                
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
public class  SparkTest{
    public static void main(String[] args) {
               // 数据库配置
               String url = "jdbc:TAOS://u05:6030/tt?user=root&password=taosdata";
               String driver = "com.taosdata.jdbc.TSDBDriver";
               String dbtable = "t1";
               
               SparkSession sparkSession = SparkSession.builder()
                               .appName("DataSourceJDBC") // 设置应用名称
                               .master("local") // 本地单线程运行
                               .getOrCreate();
               // 创建DataFrame
               Dataset<Row> df = sparkSession
                               .read() // 返回一个DataFrameReader,可用于将非流数据作为DataFrame读取
                               .format("jdbc") // JDBC数据源
                               .option("url", url)
                               .option("driver", driver)
                               .option("query", "select * from tt.meters limit 100") // 二选一,sql语句或者表
                               .load();
               // 将DataFrame的内容显示
               df.show();
               
               
               df.write() // 返回一个DataFrameWriter,可用于将DataFrame写入外部存储系统
                               .format("jdbc") // JDBC数据源
                               .mode(SaveMode.Append) // 如果第一次生成了,后续会追加
                               .option("url", url)
                               .option("driver", driver)
                               .option("dbtable", "test.meters") // 表名
                               .save();
               sparkSession.stop();
        }
}

Upvotes: 0

Related Questions