Eric Meadows
Eric Meadows

Reputation: 907

Spark .saveTableAs on AWS EMR to write to Glue Catalog is failing

In Scala, I am writing my DataFrame out to S3 using .saveTableAs, but it appears that Glue does not properly update itself with respect to the database location, format, etc. For background, the incoming dataset is 1.5TB in JSON, and the destination data format is Parquet; all Parquet files write, although the rename process is quite slow.

val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")

val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)

val currentEventDf = spark.read.json(
  dfWithJson
    .filter(col("event") === lit(currentEvent))
    .select(jsonColumn)
    .as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)

val dfWriter = dfToWrite
  .repartition(partitionBy.map(col): _*)
  .write
  .option("mode", "DROPMALFORMED")
  .mode(writeMode)
  .format(destinationFormatType)
  .option(
    "path",
    s3Path
  )
if (replaceTable) {
  println("\t- .saveAsTable")
  dfWriter
    .partitionBy(partitionBy: _*)
    .saveAsTable(tablePathInDb)
} else {
  println("\t- .insertInto")
  dfWriter.insertInto(tablePathInDb)
}

When the data is written, it appears properly and is readable in S3 via Spark, but Glue registers the Hive table improperly:

Name foobar

Description

Database bar

Classification Unknown

Location s3://foo_bucket/hive-metastore/bar.db/foobar-PLACEHOLDER

Connection

Deprecated No

Last updated Thu Jan 09 16:55:23 GMT-800 2020

Input format org.apache.hadoop.mapred.SequenceFileInputFormat

Output format org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Serde serialization lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Serde parameters

mode DROPMALFORMED

path s3://foo_bucket/foobar

serialization.format 1

Upvotes: 0

Views: 1638

Answers (1)

Eric Meadows
Eric Meadows

Reputation: 907

In order to workaround Glue improperly seeing the data as a Sequence File:

val destinationFormatType = "hive"

And additionally, add the following to dfWriter:

  .option(
    "fileFormat",
    destinationFormatType
  )

Upvotes: 1

Related Questions