Reputation: 524
I've tried to concatenate a set of DynamicFrame
objects in order to create a composite bigger one within Glue Job. According to Glue docs there are only a few methods available for DynamicFrameCollection
class and none of them allows this kind of operation. Have anyone tried to perform something similar?
A collection is an indexed by keys structure and looks like the following within gluecontext, where each datasource object is a parsed table in parquet format.
df_dic = {
"datasource0": datasource0,
"datasource1": datasource1,
"datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)
Here each DynamicFrame
is read using the read using create_dynamic_frame.from_options
method.
datasource0 = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource0",
)
Upvotes: 0
Views: 2226
Reputation: 524
I ended up using workaround, it can be achieved on the low-level API using DynamicFrame
, without converting into spark DataFrame
, in an iterative fashion using mergeDynamicFrame
method.
def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
"""[summary]
Parameters
----------
prefix : str
[description]
store_module : int
[description]
Returns
-------
DynamicFrame
[description]
"""
logger.info(f"Fetching DynamicFrame: {timestamp}")
datasource = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource",
)
return datasource
datasource0 = CustomTransform(list_tables[0], store_module)
# Iterates over other DynamicFrames listed as `list_tables`
for idx in range(1, len(list_tables)):
datasourcex = CustomTransform(list_tables[idx], store_module)
swp_datasource = datasource0.mergeDynamicFrame(
stage_dynamic_frame=datasourcex,
primary_keys=["id"],
transformation_ctx="swp_datasource",
)
datasource0 = swp_datasource
Upvotes: 0
Reputation: 6998
You can convert them to a data frame by calling the .toDF() method. Then you can use this method to union data frames regardless of their schema:
def union_with_different_columns(data_frame_1: DataFrame, data_frame_2: DataFrame) -> DataFrame:
"""This method unites two data frames with different columns by name,
setting the columns that are not present in the other data frame
to null"""
assert data_frame_1 is not None
assert data_frame_2 is not None
for column in [column for column in data_frame_1.columns if column not in data_frame_2.columns]:
data_frame_2 = data_frame_2.withColumn(column, lit(None))
for column in [column for column in data_frame_2.columns if column not in data_frame_1.columns]:
data_frame_1 = data_frame_1.withColumn(column, lit(None))
return data_frame_1.unionByName(data_frame_2)
unioned_dynamicFrame = DynamicFrame.fromDF(union_with_different_columns(datasoure0.toDF(), datasource1.toDF()), glue_context, 'dynamic_frame')
Upvotes: 1