Miguel Rueda
Miguel Rueda

Reputation: 524

Is there any method to concatenate/unite DynamicFrame objects in AWS GLue?

I've tried to concatenate a set of DynamicFrame objects in order to create a composite bigger one within Glue Job. According to Glue docs there are only a few methods available for DynamicFrameCollection class and none of them allows this kind of operation. Have anyone tried to perform something similar?

A collection is an indexed by keys structure and looks like the following within gluecontext, where each datasource object is a parsed table in parquet format.

df_dic = {
    "datasource0": datasource0,
    "datasource1": datasource1,
    "datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)

Here each DynamicFrame is read using the read using create_dynamic_frame.from_options method.

datasource0 = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [
            f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
        ]
    },
    format="parquet",
    # format_options={},
    transformation_ctx="datasource0",
)

Upvotes: 0

Views: 2226

Answers (2)

Miguel Rueda
Miguel Rueda

Reputation: 524

I ended up using workaround, it can be achieved on the low-level API using DynamicFrame, without converting into spark DataFrame, in an iterative fashion using mergeDynamicFrame method.

def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
    """[summary]

    Parameters
    ----------
    prefix : str
        [description]
    store_module : int
        [description]

    Returns
    -------
    DynamicFrame
        [description]
    """
    logger.info(f"Fetching DynamicFrame: {timestamp}")
    datasource = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": [
                f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
            ]
        },
        format="parquet",
        # format_options={},
        transformation_ctx="datasource",
    )
    return datasource

datasource0 = CustomTransform(list_tables[0], store_module)
# Iterates over other DynamicFrames listed as `list_tables`
for idx in range(1, len(list_tables)):
    datasourcex = CustomTransform(list_tables[idx], store_module)
    swp_datasource = datasource0.mergeDynamicFrame(
        stage_dynamic_frame=datasourcex,
        primary_keys=["id"],
        transformation_ctx="swp_datasource",
    )
    datasource0 = swp_datasource

Upvotes: 0

Robert Kossendey
Robert Kossendey

Reputation: 6998

You can convert them to a data frame by calling the .toDF() method. Then you can use this method to union data frames regardless of their schema:

def union_with_different_columns(data_frame_1: DataFrame, data_frame_2: DataFrame) -> DataFrame:
    """This method unites two data frames with different columns by name,
    setting the columns that are not present in the other data frame
    to null"""
    assert data_frame_1 is not None
    assert data_frame_2 is not None

    for column in [column for column in data_frame_1.columns if column not in data_frame_2.columns]:
        data_frame_2 = data_frame_2.withColumn(column, lit(None))

    for column in [column for column in data_frame_2.columns if column not in data_frame_1.columns]:
        data_frame_1 = data_frame_1.withColumn(column, lit(None))

    return data_frame_1.unionByName(data_frame_2)

unioned_dynamicFrame = DynamicFrame.fromDF(union_with_different_columns(datasoure0.toDF(), datasource1.toDF()), glue_context, 'dynamic_frame')

Upvotes: 1

Related Questions