newbieitTech
newbieitTech

Reputation: 127

Glue: map/process source table's column data and write it to columns in pre-existing redshift table

I am very new to Glue and came across to a scenario where we've source table in glue catalog and we need to write it's data to specific columns in pre-existing table in redshift. e.g.

source_table_name[source_table_column_name].       target_table_name[target_table_column_name]

employee[id]                                      resource[resource_id]
employee[name]                                    resource[resource_name]
employee[policy]                                  resource[policy_name]
employee[zip]                                     resource[zipcode]
...                                               ...
...                                               ...
...                                               ...

could you please share how/what glue function can be utilized to write UDF in python which can iterate through given subset of column names from source_table and map/write this data to specified column name in target table (as in example above) in redshift?

For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.

I've written following logic to load data in source_dynf:

    def load_data(self):
        self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
        source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
            database=self.source_database,
            table_name=self.source_table,
            transformation_ctx=f"load_{self.source_database}_{self.source_table}"
        )
   return source_dynf

  def process_data(self, source_dynf):
      ###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table

  def write_data(self):
      ###write to redshift target table

thanks in advance for suggestions/help!

Upvotes: 0

Views: 1423

Answers (1)

Jon Legendre
Jon Legendre

Reputation: 370

If you are just renaming all columns, the typical pattern is:

# in your imports
from awsglue.transforms import ApplyMapping

#just after your from_catalog
source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
    ("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
    ("name", "string", "resource_name","string")
    #and so on, follow the pattern.
], transformation_ctx="mapping")

If you plan to use pyspark dataframes instead, the syntax is easier, and doesn't fool with the types:

#in your imports
from pyspark.context import SparkContext
from awsglue.context import GlueContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

frame = source_dynf.toDF()
frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")

Per discussion below you would like to pull the schema from your target database and push it onto the source data. Something like this should do the trick:

#get the schema for the target frame
# see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
# note: you may want to read from a small partition for performance, see:
# https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
my_conn_options = {  
    "url": "jdbc:redshift://host:port/redshift database name",
    "dbtable": "redshift table name",
    "user": "username",
    "password": "password",
    "redshiftTmpDir": args["TempDir"],
    "aws_iam_role": "arn:aws:iam::account id:role/role name"
}

target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
frame = source_dynf.toDF()
frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")

Upvotes: 2

Related Questions