adam.staros
adam.staros

Reputation: 61

Unable to query Glue Table from Athena after update partitions in Glue Job

We have strange issue with Glue/Athena. We created Glue table in Cloud Formation without predefined schema to take advantage of Dynamic Frame:

OurGlueTable:
Type: AWS::Glue::Table
Properties:
  DatabaseName: !Ref OurGlueDatabase
  CatalogId: !Ref AWS::AccountId
  TableInput:
    Name: "device"
    Description: "Device table"
    TableType: EXTERNAL_TABLE
    Parameters: {"classification": "glueparquet"}
    StorageDescriptor:
      Compressed: false
      InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
      OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
      SerdeInfo:
        SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
        Parameters: {serialization.format: 1}
      Location: !Ref ExternalTableDataS3Location
      StoredAsSubDirectories: false

And we save data to this table in scala spark Glue (version 1.0) ETL:

  override def saveData(transformedDataFrame: DataFrame): Unit = {
    val frameWithDateColumns: DataFrame = addMissingColumns(transformedDataFrame)
    val dynamicFrame = DynamicFrame.apply(frameWithDateColumns, glueContext)
    val sink: DataSink = getSink
    sink.writeDynamicFrame(dynamicFrame)
  }
  private def addMissingColumns(transformedDataFrame: DataFrame): DataFrame = {
    import org.apache.spark.sql.functions._
    transformedDataFrame
      .withColumn("year", lit(processingYear))
      .withColumn("month", lit(processingMonth))
      .withColumn("day", lit(processingDay))
  }
  private def getSink = {
    val database = config.getString("database")
    val table = config.getString("table")
    val options: JsonOptions = JsonOptions(Map("partitionKeys" -> Seq("year", "month", "day"), "enableUpdateCatalog" -> true))
    val format = "glueparquet"
    val sink = glueContext.getCatalogSink(database = database, tableName = table, additionalOptions = options)
    sink.setFormat(format, JsonOptions(Map()))
    sink
  }

After first job there is glue table schema and partition update, and we are able to read data in Athena. After second job with the same input data and different day parameter (which will create different partition) glue table schema stays the same and and we can see new partition in glue table console (which is correct), but this time while reading data in Athena there is:

HIVE_METASTORE_ERROR: com.facebook.presto.spi.PrestoException: Error: type expected at the position 0 of 'long' but 'long' is found. (Service: null; Status Code: 0; Error Code: null; Request ID: null)

when we try to refresh partitions with MSCK REPAIR TABLE there is another error

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Name is null

What is important if we run jobs for two days in parallel we are able to read data in Athena, this issue occurs only when there is one job after another.

We have already tried to change table definition in Cloud Formation to create table with predefined partitions (year, month, day) and we changed StoredAsSubDirectories to true, but it did not work.

While implementing our code we followed article https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html.

Has anyone had a similar problem?

Upvotes: 4

Views: 2283

Answers (1)

vfrank66
vfrank66

Reputation: 1468

We had the same error different scenario because it was not a partition column, but we moved from "glueparquet" writer to pyspark.save.overwrite(...) with boto3 update/insert table/partition definitions such as the package aws-data-wrangler provides but with pyspark.DataFrame's.

Our problem was that the AWS Glue table schema definition does not accept "long" datatype instead the table schema provides the "bigint" datatype. While the AWS Glue table partitions schema accepts "long" datatype.

table schema -> bigint

partition schema -> long (or bigint)

We had a column in table schema with "bigint" and the partitions said "long", this was fine for AWS Glue which does not have a DDL like Athena, but Athena query failed with the error:

HIVE_METASTORE_ERROR: com.amazonaws.services.datacatalog.model.InvalidInputException: Error: type expected at the position 0 of 'long' but 'long' is found. (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)

We resolved this by changing all "long" datatypes to "bigint" and continued to use pyspark.save.overwrite().schema() using pyspark.sql.types.LongType() and it successfully works again. I believe this is what "glueparquet" does behind the scenes.

Upvotes: 0

Related Questions