Reputation: 193
I load JSON data and use relationalize method on dynamic dataframe to flatten the otherwise nested JSON object and saving it into parquet format. The problem is that once saved into parquet format for faster Athena queries, the column names contain dots, which is against the Athena SQL query syntax and thus I am unable to make column specific queries.
In order to tackle this problem I also rename the column names in the Glue job to exclude the dots and put underscores instead. My question is which approach of the two would be better and why? (Efficiency- memory? execution speed on nodes? etc.).
Also given the horrible aws glue documentation I could not come up with a dynamic frame only solution. I have problems getting the column names in dynamic fashion, thus I am utilizing toDF().
1) First approach is around getting the column names from df extracted from dynamic df
relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)
2) Second approach would be to extract the df from dynamic df and perform the rename field on the pyspark df (instead of dynamic df), to then convert back to dynamic df and save it in parquet format.
Is there a better approach? Can a crawler rename columns? How fast is .fromDF() method? Is there a better documentation on functions and methods than the pdf developer guide?
Upvotes: 4
Views: 5406
Reputation: 84
The question specifically asks about renaming:
(a) Convert to DataFrame
.
(b) Create new_columns
array with desired column names in same order as old_columns
.
(c) Overwrite and persist new_columns
using functools.reduce()
and pyspark.withColumnRenamed()
.
(d) Convert back to DynamicFrame
.
from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce
JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)
# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://path/to/source/file.csv"]},
format_options={"withHeader": True, "separator": chr(44)}, # comma delimited
)
# (a) Convert to DataFrame
df = datasource.toDF()
# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]
# (c) Overwrite and persist `new_columns`
df = reduce(
lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
range(len(old_columns)),
df,
)
# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")
# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
frame=datasource,
connection_type="s3",
connection_options={"path": "s3://path/to/target/prefix/"},
format="parquet",
)
Blockquote
Upvotes: 6
Reputation: 420
You can access the schema of the DynamicFrame with the schema
attribute. From that you can define a mapping on any columns containing .
to new columns that use _
. You just need to know the type and names of the columns to do this with the ApplyMapping transformation.
Maybe:
from awsglue.transforms import ApplyMapping
# construct renaming mapping for ApplyMapping
mappings = list()
for field in df.schema.fields:
if '.' in field.name:
dtype = field.dataType.typeName()
mappings.append((field.name, dtype, field.name.replace('.', '_'), dtype))
# apply mapping
renamed = ApplyMapping(frame=df, mappings=mappings)
Upvotes: 2