Reputation: 168
I have a Kinesis Firehose configuration in Terraform, which reads data from Kinesis stream in JSON, converts it to Parquet using Glue and writes to S3. There is something wrong with data format conversion and I am getting the below error(with some details removed):
{"attemptsMade":1,"arrivalTimestamp":1624541721545,"lastErrorCode":"DataFormatConversion.InvalidSchema","lastErrorMessage":"The schema is invalid. The specified table has no columns.","attemptEndingTimestamp":1624542026951,"rawData":"xx","sequenceNumber":"xx","subSequenceNumber":null,"dataCatalogTable":{"catalogId":null,"databaseName":"db_name","tableName":"table_name","region":null,"versionId":"LATEST","roleArn":"xx"}}
The Terraform configuration for Glue Table, I am using, is as follows:
resource "aws_glue_catalog_table" "stream_format_conversion_table" {
name = "${var.resource_prefix}-parquet-conversion-table"
database_name = aws_glue_catalog_database.stream_format_conversion_db.name
table_type = "EXTERNAL_TABLE"
parameters = {
EXTERNAL = "TRUE"
"parquet.compression" = "SNAPPY"
}
storage_descriptor {
location = "s3://${element(split(":", var.bucket_arn), 5)}/"
input_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
ser_de_info {
name = "my-stream"
serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
parameters = {
"serialization.format" = 1
}
}
columns {
name = "metadata"
type = "struct<tenantId:string,env:string,eventType:string,eventTimeStamp:timestamp>"
}
columns {
name = "eventpayload"
type = "struct<operation:string,timestamp:timestamp,user_name:string,user_id:int,user_email:string,batch_id:string,initiator_id:string,initiator_email:string,payload:string>"
}
}
}
What needs to change here?
Upvotes: 7
Views: 3129
Reputation: 1330
Thought id post here as i was facing the same problem and found a workaround for this that appears to work.
As is stated above AWS do not allow you to use tables generated from existing schema to convert data types using Firehose. That said if you are using terraform you can create the table using the existing schema, then use the columns attribute from the first table created to create another table and then use that second table as the table for data type conversion in the firehose config, i can confirm this works.
tables terraform:
resource "aws_glue_catalog_table" "aws_glue_catalog_table_from_schema" {
name = "first_table"
database_name = "foo"
storage_descriptor {
schema_reference {
schema_id {
schema_arn = aws_glue_schema.your_glue_schema.arn
}
schema_version_number = aws_glue_schema.your_glue_schema.latest_schema_version
}
}
}
resource "aws_glue_catalog_table" "aws_glue_catalog_table_from_first_table" {
name = "second_table"
database_name = "foo"
storage_descriptor {
dynamic "columns" {
for_each = aws_glue_catalog_table.aws_glue_catalog_table_from_schema.storage_descriptor[0].columns
content {
name = columns.value.name
type = columns.value.type
}
}
}
}
firehose data format conversion configuration:
data_format_conversion_configuration {
output_format_configuration{
serializer {
parquet_ser_de {}
}
}
input_format_configuration {
deserializer {
hive_json_ser_de {}
}
}
schema_configuration {
database_name = aws_glue_catalog_table.aws_glue_catalog_table_from_first_table.database_name
role_arn = aws_iam_role.firehose_role.arn
table_name = aws_glue_catalog_table.aws_glue_catalog_table_from_first_table.name
}
}
Upvotes: 0
Reputation: 7288
Frustrated by having to manually define columns, wrote a little python tool that takes a pydantic
class (could be made to work with json-schema too) and generated a json that can be used with terraform to create the table.
from pydantic import BaseModel
from typing import List
class Bar(BaseModel):
name: str
age: int
class Foo(BaseModel):
nums: List[int]
bars: List[Bar]
other: str
get converted to
{
"nums": "array<int>",
"bars": "array<struct<name:string,age:int>>",
"other": "string"
}
and can be used in terraform like so
locals {
columns = jsondecode(file("${path.module}/glue_schema.json"))
}
resource "aws_glue_catalog_table" "table" {
name = "table_name"
database_name = "db_name"
storage_descriptor {
dynamic "columns" {
for_each = local.columns
content {
name = columns.key
type = columns.value
}
}
}
}
Upvotes: 0
Reputation: 41
In addition to the answer from mberchon I found that the default generated policy for the Kinesis Delivery Stream did not include the necessary IAM permissions to actually read the schema.
I had to manually modify the IAM policy to include glue:GetSchema
and glue:GetSchemaVersion
.
Upvotes: 1
Reputation: 121
I faced the "The schema is invalid. The specified table has no columns" with the following combination:
It turns out that KDF is unable to read table's schema if table is created from existing schema. Table have to be created from scratch (in opposition to "Add table from existing schema") This isn't documented ... for now.
Upvotes: 5