Reputation: 422
I am moving data from S3 into Postgres RDS using Aws-Glue script. One column (images) in Postgres db has a jsonb
type.
Is it possible to convert string into json format to enable glue script saving into jsonb column type?
This is the script I use in aws-glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test_database", table_name = "s3_source", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("title", "string", "title", "string"), ("images", "string", "images", "string")], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["images", "title", "id"], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "test_database", table_name ="rds_target", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "test_database", table_name = "rds_target", transformation_ctx = "datasink5")
job.commit()
Upvotes: 2
Views: 1270
Reputation: 21
managed to do solve it thanks to https://stackoverflow.com/a/65821468/2797747
replaced my old write_dynamic_frame
call
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = my_dyn_frame, catalog_connection = "mydb", connection_options = {"dbtable": "mytable", "database": "mydb"}, transformation_ctx = "datasink4")
with:
df = my_dyn_frame.toDF()
url = 'jdbc:postgresql://<path>:5432/<database>'
properties = {'user':'*****',
'password':'*****',
'driver': "org.postgresql.Driver",
'stringtype':"unspecified"}
df.write.jdbc(url, table="mytable", mode="append", properties=properties)
Upvotes: 2