Reputation: 810
I'm trying to use Apache Iceberg for writing data to a specified location(S3/local). Following is the configuration used below.
SBT:
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
libraryDependencies += "org.apache.iceberg" % "iceberg-spark-runtime-3.2_2.12" % "0.13.2"
Spark:
val builder = SparkSession
.builder()
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type","hive")
.config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type","hadoop")
.config("spark.sql.catalog.local.warehouse","/Users/tom/Documents/hive/warehouse")
.getOrCreate()
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
// Create a DataFrame
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")
data.write
.format("iceberg")
.save("/Users/tom/Documents/data")
When I run the above code, I run into missing HiveCatalog dependency.
Not sure why, but iceberg-spark-runtime
contains all the necessary JAR's.
I also tried using iceberg-hive-runtime
but still face the same error below.
Cannot initialize Catalog implementation org.apache.iceberg.hive.HiveCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]
java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.hive.HiveCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]
at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:182)
at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:234)
at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:119)
at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:411)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
at org.apache.iceberg.spark.source.IcebergSource.catalogAndIdentifier(IcebergSource.java:129)
at org.apache.iceberg.spark.source.IcebergSource.extractIdentifier(IcebergSource.java:159)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:431)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1810)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1822)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1822)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1804)
at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1880)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1880)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1879)
at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1925)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1925)
at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1923)
at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]
This error occurs at the save
step.
Upvotes: 7
Views: 7279
Reputation: 2178
You have to use DataSourceV2 API as roizaig meant to say.
But even when you use this API, you still may get this error, if you don't specify your catalogName when writing to the table.
From roizaig's example (adapted):
yourDF
.writeTo("your-catalog.your-db.your-table")
.createOrReplace()
Notice your-catalog
, you have to specify it, or else it tries to connect to hive metastore and fails with the error in your error message.
You can also write to a table without specifying a catalog name like so:
yourDF
.writeTo("your-db.your-table")
.createOrReplace()
But in order to do that you have to specify spark.sql.defaultCatalog
in your SparkSession config, example from my IDE:
val spark = SparkSession.builder()
.master("local[*]")
.appName("Local iceberg test")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "C:\\Users\\wnwnn\\OneDrive\\Desktop\\Iceberg")
.config("spark.sql.defaultCatalog", "local") // <-- look here
.getOrCreate()
Batch example:
spark.sql("CREATE TABLE local.db.test_tbl(name string, age int)")
import spark.implicits._
val batchDF = List(("test", 12), ("test", 21)).toDF("name", "age")
batchDF.writeTo("db.test_tbl").append()
Also, a streaming example for those interested in structured streaming:
spark.sql("CREATE TABLE local.db.streaming_test(timestamp timestamp, value bigint)")
val streamingDf = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
streamingDf.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
.option("path", "db.streaming_test")
.option("checkpointLocation", "C:\\Users\\wnwnn\\OneDrive\\Desktop\\checkpoints\\streaming_test")
.option("fanout-enabled", "true")
.start()
.awaitTermination()
More info here:
https://iceberg.apache.org/docs/1.3.1/spark-writes/
https://iceberg.apache.org/docs/1.3.1/spark-structured-streaming/
Upvotes: 1
Reputation: 29
You miss hive-metastore dependency
libraryDependencies += "org.apache.hive" % "hive-metastore" % "3.0.0"
Adding it fixes the issue
Upvotes: 1
Reputation: 145
Have the same issue, use Catalog and DataFrame V2
spark.conf.set(s"spark.sql.catalog.your_catalogName", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(s"spark.sql.catalog.your_catalogName.warehouse", "path-to-your-data")
spark.conf.set(s"spark.sql.catalog.your_catalogName.type", "hadoop") // or "hive"
yourDF
.writeTo("your-warehouse.your-db.your-table")
.createOrReplace()
Upvotes: 2