krishna hegde
krishna hegde

Reputation: 51

Terraform: Create a kinesis firehose delivery stream using for each option

My variable files

variable "my_bucket_map" {
  type = map(object({
    name           = string
    suffix         = string
  }))
  default = {}
}

My S3 Bucket

resource "aws_s3_bucket" "my_bucket" {
  for_each = var.my_bucket_map
  bucket   = "${lower(each.value.name)}.${lower(each.value.suffix)}"
}

My glue table dependent on above s3 bucket

resource "aws_glue_catalog_table" "my_glue_table" {
  for_each      = aws_s3_bucket.my_bucket  #netsed for each used here
  name          = "my_bucket"
  database_name = aws_glue_catalog_database.my_glue_db.name
  table_type    = "EXTERNAL_TABLE"

  partition_keys {
    name = "date"
    type = "date"
  }

  storage_descriptor {
    columns {
      name = "file_name"
      type = "string"
    }
    .
    .
    .

    compressed    = false
    location      = "s3://${each.value.id}//"
    input_format  = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"

    ser_de_info {
      name                  = "ParquetHiveSerDe"
      serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
    }
  }
}

My KFH delivery stream dependent on both glue and s3 created above

resource "aws_kinesis_firehose_delivery_stream" "my_kinesis_fh" {
  for_each    = var.my_bucket_map
  name        = "${lower(each.value.name)}.${lower(each.value.suffix)}"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn        = aws_iam_role.my_firehose_role.arn
    bucket_arn      =  xxxxxxxxxxxxxxxx #==> *how to populate this*??????
    buffer_size     = 128
    buffer_interval = 60

    data_format_conversion_configuration {
      input_format_configuration {
        deserializer {
          open_x_json_ser_de {
          }
        }
      }

      output_format_configuration {
        serializer {
          parquet_ser_de {
          }
        }
      }

      schema_configuration {
        database_name = aws_glue_catalog_database.my_glue_db.name
        role_arn      = aws_iam_role.my_firehose_role.arn
        table_name    = xxxxxxxxxxxxxxxx #==> how to populate this??????
      }
    }
  }

}

I have a map my_bucket_map which I use to create mutliple aws resource. Using the same I created s3 buckets and glue tables which are created using the nested for_each. When I try to create a kinesis delivery stream which has dependency on both of these for referring few fields. So I'm not able to derive bucket_arn and table_name as they are not directly accessible.

How to get these values? Even open for suggestion about creating the s3 bucket and glue in different manner which solves this scenario.

Upvotes: 0

Views: 985

Answers (1)

Marko E
Marko E

Reputation: 18203

I would probably use the same variable across all resources as that gives you better control for chaining of the resources. The Glue catalog table would then use the my_bucket_map variable with for_each and you would just replace the location to reference bucket IDs:

resource "aws_glue_catalog_table" "my_glue_table" {
  for_each      = var.my_bucket_map
  name          = "my_bucket"
  database_name = aws_glue_catalog_database.my_glue_db.name
  table_type    = "EXTERNAL_TABLE"

  partition_keys {
    name = "date"
    type = "date"
  }

  storage_descriptor {
    columns {
      name = "file_name"
      type = "string"
    }
    .
    .
    .

    compressed    = false
    location      = "s3://${aws_s3_bucket.my_bucket[each.key].id}//"
    input_format  = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"

    ser_de_info {
      name                  = "ParquetHiveSerDe"
      serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
    }
  }
}

Then, for the Kinesis Firehose, you would just use the following:

resource "aws_kinesis_firehose_delivery_stream" "my_kinesis_fh" {
  for_each    = var.my_bucket_map
  name        = "${lower(each.value.name)}.${lower(each.value.suffix)}"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn        = aws_iam_role.my_firehose_role.arn
    bucket_arn      = aws_s3_bucket.my_bucket[each.key].arn
    buffer_size     = 128
    buffer_interval = 60

    data_format_conversion_configuration {
      input_format_configuration {
        deserializer {
          open_x_json_ser_de {
          }
        }
      }

      output_format_configuration {
        serializer {
          parquet_ser_de {
          }
        }
      }

      schema_configuration {
        database_name = aws_glue_catalog_database.my_glue_db.name
        role_arn      = aws_iam_role.my_firehose_role.arn
        table_name    = aws_glue_catalog_table.my_glue[each.key].name
      }
    }
  }

}

Upvotes: 2

Related Questions