Paul
Paul

Reputation: 906

PySpark read Iceberg table, via hive metastore onto S3

I'm trying to interact with Iceberg tables stored on S3 via a deployed hive metadata store service. The purpose is to be able to push-pull large amounts of data stored as an Iceberg datalake (on S3). Couple of days further, documentation, google, stack overflow... just not coming right.

From Iceberg's documentation the only dependencies seemed to be iceberg-spark-runtime, without guidelines from a pyspark perspective, but this is basically how far I got:

  1. iceberg-spark-runtime with set metadata-store uri allowed me to make meta data calls like listing database etc. (metadata DB functionality - postgres)
  2. Trial-error jar additions to get through the majority of java ClassNotFound exceptions.
  • After iceberg-hive-runtime-1.2.0.jar

Next exc > java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

  • After iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar

Next exc > java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException

  • After adding iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar

Next exc > java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/WeakRefMetricsSource

  • After adding iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar,hadoop-common-3.3.5.jar

Next exc > org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file Caused by: java.nio.file.AccessDeniedException

  1. From a Jupyter pod on k8s the s3 serviceaccount was added, and tested that interaction was working via boto3. From pyspark, table reads did however still raise exceptions with s3.model.AmazonS3Exception: Forbidden, until finding the correct spark config params that can be set (using s3 session tokens mounted into pod from service account)
  2. Next exceptions was related to java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding, even though seeing that the functions are explicitly contained within the hadoop-common.jar by viewing the class source code

This is where I decided to throw in the towel, to ask if I'm going down completely the wrong rabbit hole, or what's going on. This is my current code with some of the example tests:

token : str = "-- jwt s3 session token --"

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

conf = SparkConf()
conf.set("spark.jars", "iceberg-hive-runtime-1.2.0.jar,hadoop-aws-3.3.5.jar,aws-java-sdk-bundle-1.12.316.jar,hadoop-common-3.3.5.jar")
conf.set("hive.metastore.uris", "thrift://hivemetastore-hive-metastore:9083")
conf.set("fs.s3a.assumed.role.arn", "-- aws iam role --")
conf.set("spark.hadoop.fs.s3a.session.token", token)
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider")

sc = SparkContext( conf=conf)
spark = SparkSession.builder.appName("py sql").enableHiveSupport() \
    .getOrCreate()
$ hive --version

# response
WARNING: log4j.properties is not found. HADOOP_CONF_DIR may be incomplete.
WARNING: log4j.properties is not found. HADOOP_CONF_DIR may be incomplete.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hive/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive 3.1.3
Git git://MacBook-Pro.fios-router.home/Users/ngangam/commit/hive -r 4df4d75bf1e16fe0af75aad0b4179c34c07fc975
Compiled by ngangam on Sun Apr 3 16:58:16 EDT 2022
From source with checksum 5da234766db5dfbe3e92926c9bbab2af

From this I'm able to:

# list databases
spark.catalog.listDatabases()

# print table schema
spark_df = spark.sql("select * from db_name.table_name")
spark_df.printSchema()

# show tables from database via sql, but not with pyspark function
# -> works
spark.sql("show tables from db_name").show()
# -> not work
spark.catalog.listTables('db_name')

# not able to interact - read data from the actual external s3 table
spark.read.format("iceberg")
spark.catalog.setCurrentDatabase('db_name')
spark.read.table("table_name")

Iceberg table S3 interfacing with the exception log being (from point 4):

Py4JJavaError: An error occurred while calling o44.table.
: java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'
    at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)
    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404)
    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282)
    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427)
    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545)
    at java.base/java.io.DataInputStream.read(DataInputStream.java:151)
    at org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:123)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1685)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1084)
    at org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3714)
    at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:273)
    at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266)
    at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:189)
    at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)
    at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:185)
    at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:180)
    at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:176)
    at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
    at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
    at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)
    at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:124)
    at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:111)
    at org.apache.iceberg.mr.hive.HiveIcebergSerDe.initialize(HiveIcebergSerDe.java:84)
    at org.apache.hadoop.hive.serde2.AbstractSerDe.initialize(AbstractSerDe.java:54)
    at org.apache.hadoop.hive.serde2.SerDeUtils.initializeSerDe(SerDeUtils.java:533)
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:453)
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:440)
    at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:281)
    at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:263)
    at org.apache.hadoop.hive.ql.metadata.Table.getColsInternal(Table.java:641)
    at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:624)
    at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree2$1(HiveClientImpl.scala:448)
    at org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$convertHiveTableToCatalogTable(HiveClientImpl.scala:447)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$3(HiveClientImpl.scala:434)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$1(HiveClientImpl.scala:434)
    at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:298)
    at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:229)
    at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:228)
    at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:278)
    at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:432)
    at org.apache.spark.sql.hive.client.HiveClient.getTable(HiveClient.scala:95)
    at org.apache.spark.sql.hive.client.HiveClient.getTable$(HiveClient.scala:94)
    at org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:92)
    at org.apache.spark.sql.hive.HiveExternalCatalog.getRawTable(HiveExternalCatalog.scala:122)
    at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:729)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
    at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:729)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:515)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:500)
    at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.loadTable(V2SessionCatalog.scala:66)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:311)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1202)
    at scala.Option.orElse(Option.scala:447)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$1(Analyzer.scala:1201)
    at scala.Option.orElse(Option.scala:447)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1193)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1064)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1028)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1028)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:987)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:231)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:227)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:227)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:212)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at org.apache.spark.sql.DataFrameReader.table(DataFrameReader.scala:607)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

Upvotes: 6

Views: 8277

Answers (2)

Paul
Paul

Reputation: 906

Checking versions was part of the solution, in addition to other config that was required to be set to for interfacing with Iceberg via a setup hive metastore service. An example of the entire config example for completeness:

token = "<jwt for auth purposes to AWS services>"

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.jars", "hadoop-aws-3.3.2.jar,iceberg-spark-runtime-3.3_2.12-1.3.0.jar,aws-java-sdk-bundle-1.11.1026.jar")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")


# # link to remote metastore
conf.set("hive.metastore.uris", "thrift://hivemetastore-hive-metastore:9083")
conf.set("spark.hadoop.fs.s3a.session.token", token)
conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider")

# used to transmit pandas dataframes via arrow to iceberg table
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", "true")  

# Set 'spark_catalog' config for hive
conf.set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")
conf.set("spark.sql.catalog.spark_catalog.type","hive")
conf.set("spark.sql.catalog.hive_prod","org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.hive_prod.type","hive")
conf.set("spark.sql.catalog.hive_prod.uri", "thrift://hivemetastore-hive-metastore:9083")

sc = SparkContext( conf=conf)
spark = SparkSession.builder.appName("py sql").enableHiveSupport().getOrCreate()

From this you can run the typical spark calls, as example functions that require table interaction from S3 (not just meta data)

  • Table reading function
spark.table("<DB name>.<table name>").count()
  • Writing function to table
longdf = spark.createDataFrame(df) 
longdf.writeTo(f"<DB name>.<table name>").append() 

NOTE: I did however notice a weird occurrence where the spark memory usage gradually just keeps increasing as you do read / write commands. Uncertain what spark config to set to limit memory usage sufficiently

Upvotes: 1

zhukovgreen
zhukovgreen

Reputation: 1648

I had a similar issue and reducing the patch version on hadoop libs helped me to resolve the issue:

spark.jars.packages org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4

Upvotes: 4

Related Questions