Averell
Averell

Reputation: 843

Spark - EMR - GlueCatalog: DataFrameWriter.bucketBy() fails with UnknownHostException

I'm trying to save my Spark dataframe (Zeppelin notebook running on EMR) to GlueCatalog in my same AWS account. The method saveAsTable() works without any issue when I don't use bucketBy(). When I use that, I'll get the UnknownHostException

That hostname is not in my EMR. And when I change the database name, a different hostname is reported.

My questions are: where the configuration for that hostname is? What it is for? And why bucketBy needs that?

Thanks for your help. Averell

spark.sql("use my_database_1")
my_df.write.partitionBy("dt").mode("overwrite").bucketBy(10, "id").option("path","s3://my-bucket/").saveAsTable("my_table")
java.lang.IllegalArgumentException: java.net.UnknownHostException: ip-10-10-10-71.ourdc.local
  at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:418)
  at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:132)
  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:351)
  at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:285)
  at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
  at org.apache.spark.sql.hive.HiveExternalCatalog.saveTableIntoHive(HiveExternalCatalog.scala:496)
  at org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$createDataSourceTable(HiveExternalCatalog.scala:399)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply$mcV$sp(HiveExternalCatalog.scala:263)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply(HiveExternalCatalog.scala:236)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply(HiveExternalCatalog.scala:236)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
  at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:236)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:324)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:185)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:474)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:453)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
  ... 47 elided
Caused by: java.net.UnknownHostException: ip-10-10-10-71.ourdc.local
  ... 87 more

Upvotes: 0

Views: 557

Answers (1)

Averell
Averell

Reputation: 843

There were 2 separate issues in my question:

  1. Where the hostname came from
  2. Why the problem was found only when bucketBy was used.

For question (1), our glue DB was created using spark.sql("create database mydb"). This will create a glue database with location set to an HDFS path, which defaulted to have the EMR master IP address. 10.10.10.71 was the IP address of our old EMR (which has been terminated)

For question (2), it seems that when doing bucketBy and sortBy, Spark needs to have some temporary space before writing to the final destination. The location of that temporary space is the default location of the database, with the full path to be <db_location>-<table_name>-__PLACEHOLDER__

Fixes: (1) needs to modify the location of the database in Glue. Nothing needs/can be done on (2)

Upvotes: 2

Related Questions