Reputation: 161
I am trying to familiarize myself with Apache Iceberg and I'm having some trouble understanding how to write some external data to a table using Spark SQL.
Is it even possible to read external data using Spark SQL? And then write it to the iceberg tables? Do I have to use scala or python to do this? I've been through the Iceberg and Spark 3.0.1 documentation a bunch but maybe I'm missing something.
Code Update
Here is some code that I hope will help
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.local.type", "hadoop")
spark.conf.set("spark.sql.catalog.local.warehouse", "data/warehouse")
I have the data I need to use sitting in a directory /one/one.csv
How do I get it into an Iceberg table using Spark? Can all of this be done purely using SparkSQL?
spark.sql(
"""
CREATE or REPLACE TABLE local.db.one
USING iceberg
AS SELECT * FROM `/one/one.csv`
"""
)
Then the goal is I can work with this iceberg table directly for example:
select * from local.db.one
and this would give me all the content from the /one/one.csv file.
Upvotes: 1
Views: 26022
Reputation: 3066
There are two ways to create an Iceberg table using Spark:
1. Using Spark SQL:
This method allows us to define the table schema and properties using SQL syntax.
spark.sql("""
CREATE TABLE IF NOT EXISTS table1 (
id bigint,
data string
)
USING iceberg;
""")
2. Using DataFrame API
This method leverages DataFrames to define the table data and sets Iceberg as the format using specific operations.
import spark.implicits._
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
data.writeTo("table1")
.using("iceberg")
.createOrReplace()
Upvotes: 0
Reputation: 9
This example is reading data from Kafka and writing data to Iceberg table
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
sparkConf.set("spark.sql.catalog.spark_catalog.type", "hive")
sparkConf.set("spark.sql.catalog.hive_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_catalog.type", "hadoop")
sparkConf.set("spark.sql.catalog.hive_catalog.warehouse", "hdfs://host:port/user/hive/warehouse")
sparkConf.set("hive.metastore.uris", "thrift://host:19083")
sparkConf.set("spark.sql.catalog.hive_prod", " org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.hive_prod.type", "hive")
sparkConf.set("spark.sql.catalog.hive_prod.uri", "thrift://host:19083")
sparkConf.set("hive.metastore.warehouse.dir", "hdfs://host:port/user/hive/warehouse")
val spark: SparkSession = SparkSession.builder()
.enableHiveSupport()
.config(sparkConf)
.master("yarn")
.appName("kafkaTableTest")
.getOrCreate()
spark.sql(
"""
|
|create table if not exists hive_catalog.icebergdb.kafkatest1(
| company_id int,
| event string,
| event_time timestamp,
| position_id int,
| user_id int
|)using iceberg
|PARTITIONED BY (days(event_time))
|""".stripMargin)
import spark.implicits._
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server")
.option("subscribe", "topic")
.option("startingOffsets", "latest")
.load()
//.selectExpr("cast (value as string)")
val value: DataFrame = df.selectExpr("CAST(value AS STRING)")
.as[String]
.map(data => {
val json_str: JSONObject = JSON.parseObject(data)
val company_id: Integer = json_str.getInteger("company_id")
val event: String = json_str.getString("event")
val event_time: String = json_str.getString("event_time")
val position_id: Integer = json_str.getInteger("position_id")
val user_id: Integer = json_str.getInteger("user_id")
(company_id, event, event_time, position_id, user_id)
})
.toDF("company_id", "event", "event_time", "position_id", "user_id")
value.createOrReplaceTempView("table")
spark.sql(
"""
|select
| company_id,
| event,
| to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time,
| position_id,
| user_id
|from table
|""".stripMargin)
.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path","hive_catalog.icebergdb.kafkatest1") // tablePath: catalog.db.tableName
.option("checkpointLocation","hdfspath")
.start()
.awaitTermination()
Upvotes: 0
Reputation: 59
To use the SparkSQL, read the file into a dataframe, then register it as a temp view. This temp view can now be referred in the SQL as:
var df = spark.read.format("csv").load("/data/one.csv")
df.createOrReplaceTempView("tempview");
spark.sql("CREATE or REPLACE TABLE local.db.one USING iceberg AS SELECT * FROM tempview");
To answer your other question, Scala or Python is not required; the above example is in Java.
Upvotes: 3