Leroy Mikenzi
Leroy Mikenzi

Reputation: 810

Missing hive dependency issues with Apache Iceberg

I'm trying to use Apache Iceberg for writing data to a specified location(S3/local). Following is the configuration used below.

SBT:

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
libraryDependencies += "org.apache.iceberg" % "iceberg-spark-runtime-3.2_2.12" % "0.13.2"

Spark:

val builder = SparkSession
      .builder()
      .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.sql.catalog.spark_catalog.type","hive")
      .config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.local.type","hadoop")
      .config("spark.sql.catalog.local.warehouse","/Users/tom/Documents/hive/warehouse")
      .getOrCreate()

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

// Create a DataFrame
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")

data.write
.format("iceberg")
.save("/Users/tom/Documents/data") 

When I run the above code, I run into missing HiveCatalog dependency. Not sure why, but iceberg-spark-runtime contains all the necessary JAR's. I also tried using iceberg-hive-runtime but still face the same error below.

Cannot initialize Catalog implementation org.apache.iceberg.hive.HiveCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]
java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.hive.HiveCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]
    at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:182)
    at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:234)
    at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:119)
    at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:411)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
    at org.apache.iceberg.spark.source.IcebergSource.catalogAndIdentifier(IcebergSource.java:129)
    at org.apache.iceberg.spark.source.IcebergSource.extractIdentifier(IcebergSource.java:159)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
    at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1810)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1822)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1822)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1804)
    at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1880)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1880)
    at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1879)
    at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
    at org.scalatest.Suite.run(Suite.scala:1114)
    at org.scalatest.Suite.run$(Suite.scala:1096)
    at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
    at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1925)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1925)
    at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1923)
    at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    Missing org.apache.iceberg.hive.HiveCatalog [java.lang.NoClassDefFoundError: org/apache/thrift/TException]

This error occurs at the save step.

Upvotes: 7

Views: 7279

Answers (3)

Pavel Orekhov
Pavel Orekhov

Reputation: 2178

You have to use DataSourceV2 API as roizaig meant to say.

But even when you use this API, you still may get this error, if you don't specify your catalogName when writing to the table.

From roizaig's example (adapted):

yourDF
  .writeTo("your-catalog.your-db.your-table")
  .createOrReplace()

Notice your-catalog, you have to specify it, or else it tries to connect to hive metastore and fails with the error in your error message.

You can also write to a table without specifying a catalog name like so:

yourDF
  .writeTo("your-db.your-table")
  .createOrReplace()

But in order to do that you have to specify spark.sql.defaultCatalog in your SparkSession config, example from my IDE:

val spark = SparkSession.builder()
    .master("local[*]")
    .appName("Local iceberg test")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "C:\\Users\\wnwnn\\OneDrive\\Desktop\\Iceberg")
    .config("spark.sql.defaultCatalog", "local") // <-- look here
    .getOrCreate()

Batch example:

spark.sql("CREATE TABLE local.db.test_tbl(name string, age int)")

import spark.implicits._

val batchDF = List(("test", 12), ("test", 21)).toDF("name", "age")
batchDF.writeTo("db.test_tbl").append()

Also, a streaming example for those interested in structured streaming:

spark.sql("CREATE TABLE local.db.streaming_test(timestamp timestamp, value bigint)")

val streamingDf = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()

streamingDf.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
    .option("path", "db.streaming_test")
    .option("checkpointLocation", "C:\\Users\\wnwnn\\OneDrive\\Desktop\\checkpoints\\streaming_test")
    .option("fanout-enabled", "true")
    .start()
    .awaitTermination()

More info here:

https://iceberg.apache.org/docs/1.3.1/spark-writes/

https://iceberg.apache.org/docs/1.3.1/spark-structured-streaming/

Upvotes: 1

aceoftrumps
aceoftrumps

Reputation: 29

You miss hive-metastore dependency

libraryDependencies += "org.apache.hive" % "hive-metastore" % "3.0.0"

Adding it fixes the issue

Upvotes: 1

roizaig
roizaig

Reputation: 145

Have the same issue, use Catalog and DataFrame V2

  spark.conf.set(s"spark.sql.catalog.your_catalogName", "org.apache.iceberg.spark.SparkCatalog")
  spark.conf.set(s"spark.sql.catalog.your_catalogName.warehouse", "path-to-your-data")
  spark.conf.set(s"spark.sql.catalog.your_catalogName.type", "hadoop") // or "hive"

  yourDF
    .writeTo("your-warehouse.your-db.your-table")
    .createOrReplace()

Upvotes: 2

Related Questions