ResponsiblyUnranked
ResponsiblyUnranked

Reputation: 1816

AWS Glue with PySpark - DynamicFrame export to S3 fails partway through with UnsupportedOperationException

I should preface this by saying I've been using AWS Glue Studio to learn how to use Glue with PySpark, and so far it's been going really well. That was until I encountered an error which I cannot understand (let alone solve). An example of the data can be found at the bottom.

Context

All I was doing was a simple data transformation. Input S3 Bucket --> CustomTransform --> Output S3. But the program keeps crashing after exporting some of the data. I mention it later too, but I even tried removing the CustomTransformation, but the S3 data export still failed, even when just going from one Bucket to the other.

The Error

Here is the Python part of the error I'm getting (copied from CloudWatch):

2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/GlueTest.py", line 69, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
    "path": "s3://example-bucket-here/data/",
    "compression": "snappy",
    "partitionKeys": []
}, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

The real puzzle

What's confusing me most is that this crash happens after it's already exported most of the data to S3. Immediately this would suggest there is something wrong with the data, as it gets up to some corrupt (or poorly formatted) data and then crashes.

So I looked at the difference between the successfully exported data, and the input data and found all the rows which were not exported. Nothing struck me as strange or a cause for the failing of the export.

It might help to know that the schema is being inferred by AWS Glue when I select the S3 Bucket as an input source.

What I've tried

So I tried exporting the data in all the different formats that Glue supports, but none worked. I also tried skipping all the data transformations and just take the Input S3 Bucket and export straight to the Output S3 Buckets, but it still crashed with the same error (actually that's the error message I included above!).

Again, that all suggests something is wrong with the data, but I've looked through all the data that didn't make it through the process (only about 180 records) and it all looks just like the data that did make it through.

And for a sanity check, I used the Input S3 --> Output S3 method on some other (quite similar) data and it worked fine, basically acted as a copy-paste.

I also came across this article. But it didn't really help, when I tried changing the output format to get more information, I was met with the same error - no extra info.

Is anyone able to help identify the issue here? Nothing suggests this should be crashing. I'm happy to provide the rest of the Java Error if that would help people.

Data Example

Here is what my data looks like:

Date        ticker_name  currency exchange_name instrument_type first_trade_date Amount
1612229400  0382.HK      HKD      HKG           EQUITY          1563240600       0.049
1613140200  SO           USD      NYQ           EQUITY          378657000        0.64
1613053800  SIGI         USD      NMS           EQUITY          322151400        0.25
1614240000  SIGT.L       GBp      LSE           EQUITY          828601200        1.68
1612249200  SIH.BE       EUR      BER           EQUITY          1252044000       0.038

All fields are Strings apart from Date (long), first_trade_date (long) and Amount (double).

When I call .printSchema() I get the following:

root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double

Upvotes: 1

Views: 6429

Answers (1)

ResponsiblyUnranked
ResponsiblyUnranked

Reputation: 1816

The Solution

So if anyone is having this issue, it can be frustrating because this error seems to lend no information about what's actually going wrong. One of the only clues I had was this article. Which suggested there was something wrong with my schema.

I had to look at my data very closely and eventually noticed that I would only get this error when I ran it with certain files in combination with other files.

It turns out that some of my parquet files had the date in an int format and at other times it was a float. This data was being created from a Pandas DataFrame using .to_parquet() in a different function, so I'm not sure why there was an inconsistency in the data types.

What puzzled me most is why when I tried casting the date type to all be int (as seen here) I still got the error.

Anyway, my solution was to fix the way Pandas was outputting the data and make sure it always output the date as an Integer before Glue processed the data.

Upvotes: 3

Related Questions