Reputation: 1370
I need to copy data from one dynamo table to another and do some transformation along the way. For that, I exported data from the source table to s3 and ran crawler over it. In my Glue Job I'm using following code:
mapped = apply_mapping.ApplyMapping.apply(
frame=source_df,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.createdAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
frame=target_df,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "eu-west-1",
"dynamodb.output.tableName": "my-table",
"dynamodb.throughput.write.percent": "1.0"
}
)
In the source dynamo table the options
field is a String Set. In transformation, it remains untouched. However, in the target table is a list of strings:
"options": {
"L": [
{
"S": "option A"
},
{
"S": "option B"
}
]
}
Could anyone advise how to write a string set into DynamoDB using AWS Glue?
Upvotes: 6
Views: 2412
Reputation: 1370
Unfortunately, I couldn't find a way to write string sets to DynamoDB using Glue interfaces. I've found some solutions using boto3 with Spark so here is my solution. I skipped the transformation part and simplified the example in general.
# Load source data from catalog
source_dyf = glue_context.create_dynamic_frame_from_catalog(
GLUE_DB, GLUE_TABLE, transformation_ctx="source"
)
# Map dynamo attributes
mapped_dyf = ApplyMapping.apply(
frame=source_dyf,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.updatedAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
def _putitem(items):
resource = boto3.resource("dynamodb")
table = resource.Table("new_table")
with table.batch_writer() as batch_writer:
for item in items:
batch_writer.put_item(Item=item)
df = mapped_dyf.toDF()
# Apply spark transformations ...
# save partitions to dynamo
df.rdd.mapPartitions(_putitem).collect()
Depends on your data volume you might want to increase the number of retries in boto3 or even change the mechanism. Also, you might want to play with DynamoDB Provisioning. I switched to on-demand to run this particular migration, but there is a catch
Upvotes: 5
Reputation: 20830
You can try using the ResolveChoice class to convert the datatype
There are 4 different types, a column with the ambiguous type can be converted into.
Something like this might help:
resolvedMapping = ResolveChoice.apply(mapped , specs = [("item.options.SS", "make_struct")])
You can refer the link for details:
https://github.com/aws-samples/aws-glue-samples/blob/master/examples/resolve_choice.md
Upvotes: 0