Joao Sousa
Joao Sousa

Reputation: 11

AWS Glue job to merge columns into a timestamp

I'm very new to using AWS Glue and Spark. I'm trying to run an ETL job so my data which is currently parsed as three separate columns(year, month and day) and I need to merge these columns into a datetime(or timestamp) format. Glue generated a basic script that I'm trying to add this logic to but I'm having little success.

Here is the relevant part of the code:

timestampedDf = dropnullfields3.toDF()
timestampedDf = timestampedDf.withColumn("snap_timestamp", datetime.date(year=int(timestampedDf['year']),day=int(timestampedDf['day']),month=int(timestampedDf['month']))
timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestamped4")

And the logger gives me back the following error:

Syntax Error: File "/tmp/g-8b0c4794d23f8afeb757fae2a20be7a4b9222fef-5379414877065320437/script_2019-03-20-14-12-14.py", line 40 timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestamped4") SyntaxError: invalid syntax

And here is the complete code for reference.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "perseus-reporting-db", table_name = "charges_dev_perseus_reporting", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "perseus-reporting-db", table_name = "charges_dev_perseus_reporting", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("amount", "double", "amount", "double"), ("customerid", "string", "customerid", "string"), ("status", "string", "status", "string"), ("createdat", "string", "createdat", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("amount", "double", "amount", "double"), ("customerid", "string", "customerid", "string"), ("status", "string", "status", "string"), ("createdat", "string", "createdat", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

timestampedDf = dropnullfields3.toDF()
timestampedDf = timestampedDf.withColumn("snap_timestamp", datetime.date(year=int(timestampedDf['year']),day=int(timestampedDf['day']),month=int(timestampedDf['month']))
timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestamped4")

## @type: DataSink
## @args: [catalog_connection = "s3-rds-conn-perseus", connection_options = {"dbtable": "charges_dev_perseus_reporting", "database": "reporting-db"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = timestamped4, catalog_connection = "s3-rds-conn-perseus", connection_options = {"dbtable": "charges_dev_perseus_reporting", "database": "reporting-db"}, transformation_ctx = "datasink4")
job.commit()

Thanks in advance!

Upvotes: 1

Views: 2226

Answers (1)

Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

Try using to_date() with concat() Spark functions

from pyspark.sql.functions import concat, to_date, col, lit

timestampedDf = dropnullfields3.toDF()
timestampedDf = timestampedDf.withColumn("snap_timestamp", to_date(concat(col('year'), lit('-'), col('month'), lit('-'), col('day'))))
timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestamped4")

Upvotes: 1

Related Questions