Reputation: 228
Greetings all experts,
I've faced a problem and I need a solution. Please help me with this.
So, I have a dynamic frame created from an XML file stored in s3.
The frame has a nested field 'ReceiptNumber' and the dynamic frame's schema is like below:
root
|-- Receipt: struct
| |-- Front: struct
| | |-- FrontNumber: string
| | |-- CountryorTerritoryCode: string
| | |-- TaxId: string
| |-- ReceiptAmount: double
| |-- ReceiptCurrencyCode: string
| |-- ReceiptDateCCYYMMDD: int
| |-- ReceiptNumber: double
| |-- TaxVarianceAmount: double
| |-- TransferDetails: array
| | |-- element: struct
| | | |-- BillCategoryCode: string
| | | |-- BillCategoryDetailCode: string
| | | |-- Porting: array
| | | | |-- element: struct
| | | | | |-- AddressDetails: struct
| | | | | | |-- ConsigneeAddress: struct
| | | | | | | |-- Address: struct
| | | | | | | | |-- AddressText2: string
| | | | | | | | |-- CityName: string
| | | | | | | | |-- CountryorTerritoryCode: string
| | | | | | | | |-- PostalCode: string
| | | | | | | | |-- StateCode: string
| | | | | | | | |-- StreetAddress: string
| | | | | | | |-- Addressee: struct
| | | | | | | | |-- Name: string
| | | | | | | |-- Attention: struct
| | | | | | | | |-- Name: string
| | | | | | |-- SenderAddress: struct
| | | | | | | |-- Address: struct
| | | | | | | | |-- CityName: string
| | | | | | | | |-- CountryorTerritoryCode: string
| | | | | | | | |-- PostalCode: string
| | | | | | | | |-- StateCode: string
| | | | | | | | |-- StreetAddress: string
| | | | | | | |-- Addressee: struct
| | | | | | | | |-- Name: string
| | | | | | | |-- Attention: struct
| | | | | | | | |-- Name: string
| | | | | | |-- ThirdPartyAddress: struct
| | | | | | | |-- Address: struct
| | | | | | | | |-- CityName: string
| | | | | | | | |-- CountryorTerritoryCode: string
| | | | | | | | |-- PostalCode: string
| | | | | | | | |-- StreetAddress: string
| | | | | | | |-- Addressee: struct
| | | | | | | | |-- Name: string
| | | | | | | |-- Attention: struct
| | | | | | | | |-- Name: string
| | | | | |-- BillOptionCode: string
| | | | | |-- LeadPortingNumber: string
| | | | | |-- Package: array
| | | | | | |-- element: struct
| | | | | | | |-- BillDetails: struct
| | | | | | | | |-- Bill: array
| | | | | | | | | |-- element: struct
| | | | | | | | | | |-- BillInformation: array
| | | | | | | | | | | |-- element: struct
| | | | | | | | | | | | |-- BasisCurrencyCode: string
| | | | | | | | | | | | |-- BasisValue: double
| | | | | | | | | | | | |-- BilldUnitQuantity: int
| | | | | | | | | | | | |-- CurrencyCode: string
| | | | | | | | | | | | |-- DescriptionCode: string
| | | | | | | | | | | | |-- DescriptionOfBills: string
| | | | | | | | | | | | |-- ExemptionAmount: double
| | | | | | | | | | | | |-- IncentiveAmount: double
| | | | | | | | | | | | |-- NetAmount: double
| | | | | | | | | | | | |-- TaxIndicator: double
| | | | | | | | | | |-- ClassificationCode: string
| | | | | | | |-- ContainerType: string
| | | | | | | |-- MiscellaneousDetails: struct
| | | | | | | | |-- MiscellaneousLineItems: struct
| | | | | | | | | |-- LineItem: struct
| | | | | | | | | | |-- LineNumber: int
| | | | | | | | | | |-- LineText: string
| | | | | | | |-- PackageBillableKeyedDimensions: struct
| | | | | | | | |-- Height: double
| | | | | | | | |-- Length: double
| | | | | | | | |-- Width: double
| | | | | | | |-- PackageDimension: struct
| | | | | | | | |-- Height: double
| | | | | | | | |-- Length: double
| | | | | | | | |-- UnitOfMeasure: string
| | | | | | | | |-- Width: double
| | | | | | | |-- PackageKeyedDimensions: struct
| | | | | | | | |-- Height: double
| | | | | | | | |-- Length: double
| | | | | | | | |-- UnitOfMeasure: string
| | | | | | | | |-- Width: double
| | | | | | | |-- PackageQuantity: struct
| | | | | | | | |-- ActualQuantity: struct
| | | | | | | | | |-- Quantity: int
| | | | | | | |-- PackageWeight: struct
| | | | | | | | |-- ActualWeight: struct
| | | | | | | | | |-- UnitOfMeasure: string
| | | | | | | | | |-- Weight: double
| | | | | | | | |-- BilledWeight: struct
| | | | | | | | | |-- UnitOfMeasure: string
| | | | | | | | | |-- Weight: double
| | | | | | | | |-- BilledWeightType: double
| | | | | | | |-- TrackingNumber: string
| | | | | | | |-- Zone: int
| | | | | |-- PayerRoleCd: int
| | | | | |-- PickUpRecordNumber: long
| | | | | |-- PortingReferences: struct
| | | | | | |-- Reference: array
| | | | | | | |-- element: struct
| | | | | | | | |-- ReferenceNumber: string
| | | | | | | | |-- Sequence: int
| | | | | |-- TransferDateCCYYMMDD: int
| |-- TypeCode: string
| |-- TypeDetailCode: double
What I want to change before writing the dynamic frame is to make the field 'ReceiptNumber' a string type like below
....
....
| |-- ReceiptCurrencyCode: string
| |-- ReceiptDateCCYYMMDD: int
| |-- ReceiptNumber: string
| |-- TaxVarianceAmount: double
....
....
Can it be possible via apply_mapping
?
Is there any alternative solution?
Upvotes: 3
Views: 8494
Reputation: 3202
I had a similar problem where I had to add / delete and change the types of many columns. For my case I ended up using the Map transformation that applies a function to all records of a DynamicFrame .
inputDyf = glueContext.create_dynamic_frame_from_options(
...
)
def mapping(record: Dict[str, Any]):
record["UpdatedAt"] = int(time.mktime(datetime.date.today().timetuple()))
record["SomeVal"] = int(record["SomeVal"])
# ... put, del and other dict operations
return record
mapped_dyF = Map.apply(frame=inputDyf, f=mapping)
Also you can specify the schema on the create_dynamic_frame_from_options method when using the XML format:
schema = StructType([
Field("name", StringType()),
])
datasource0 = create_dynamic_frame_from_options(
connection_type,
connection_options={"paths": ["s3://xml_bucket/someprefix"]},
format="xml",
format_options={"withSchema": json.dumps(schema.jsonValue())},
transformation_ctx = ""
)
# or directly as an string
datasource0 = create_dynamic_frame_from_options(
connection_type,
connection_options={"paths": ["s3://xml_bucket/someprefix"]},
format="xml",
format_options={
"withSchema": """
{
"dataType": "struct",
"properties": {},
"fields": [
{
"name": "name",
"container": {
"dataType": "string",
"properties": {}
},
"properties": {}
}
]
}
"""},
transformation_ctx = ""
)
Upvotes: 1
Reputation: 228
At last, I was able to solve it with a little bit of a different approach.
So, to recap, I have a Glue ETL type job, written in python script.
It was responsible for processing an XML file. After processing the XML file, its schema was like the above, as I mentioned in the question.
So, I wanted to change the type of one of its nodes which is 'ReceiptNumber' to string
from int
.
So, first I created a dynamic frame from the s3 file as usual
d0 = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options={"paths": [s3_path]}, format = "xml", format_options={"rowTag": "ReceiptDetails"}, transformation_ctx = "d0")
Then, turned the dynamic frame into pyspark dataframe like below
df = d0.toDF();
Then, I utilized the function written in the following link that how we can modify a nested struct field and its type.
Pyspark: How to Modify a Nested Struct Field
From the function, I created a new_schema
that utilized like below and converted it into a new dynamicframe like below.
df = df.withColumn("Receipt_json", to_json("Receipt")).drop("Receipt")
df = df.withColumn("Receipt", from_json("Receipt_json", new_schema)).drop("Receipt_json")
d0 = DynamicFrame.fromDF(df, glueContext, "d0")
From the new dynamicframe which has a modified field 'ReceiptNumber' (from int
to string
), I created a JSON schema like below.
receiptSchema = d0.schema()
withReceiptSchema = json.dumps(receiptSchema.jsonValue())
At last, I created the schema again like below with new schema and wrote it down in a JSON file like below.
d0 = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options={"paths": [s3_path]}, format = "xml", format_options={"withSchema": withReceiptSchema, "rowTag": "ReceiptDetails"}, transformation_ctx = "d0")
# writing the down the data from above schema in a JSON file
glueContext.write_dynamic_frame.from_options(frame = d0, connection_type = "s3", connection_options = {"path": s3_write_path}, format = "json")
I hope, if someone falls into this sort of error or roadblock while working on Aws Glue Jobs, this answer could be of help.
Upvotes: 2