Reputation: 508
I am trying to merge multiple parquet files using aws glue job. I am aware of the similar question and the possible solution mentioned here. I have tried it and it doesn't seem to work. Here is my sample code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = glueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://bucket-name/parquet/2021/02/15/15/"]})
partitioned_df=df.toDF().repartition(1)
partitioned_dynamic_df=DynamicFrame.fromDF(partitioned_df,glueContext,"partitioned_df")
datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet")
job.commit()
I have printed partitioned_dynamic_df
and it is the combined df of all the parquets. But I keep getting this error message, and don't know how to resolve.
Traceback (most recent call last):
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o76.pyWriteDynamicFrame. : java.lang.IllegalArgumentException: Expected exactly one path to be specified, but got: at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:427
at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271
at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:535
at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:522
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43
at java.lang.reflect.Method.invoke(Method.java:498
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357
at py4j.Gateway.invoke(Gateway.java:282
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132
at py4j.commands.CallCommand.execute(CallCommand.java:79
at py4j.GatewayConnection.run(GatewayConnection.java:238
at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmp/job-name", line 33, in <module> datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet") File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options format, format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in w
rite_from_options return sink.write(frame_or_dfc) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write return self.writeFrame(dynamic_frame_or_dfc, info) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Expected exactly one path to be specified, but got: '
The data format is something like this:
{"lead:objectid": 21345, "contactdetails:name": "Ruben Dias", "contactdetails:contactnumber": 9838942345}
{"lead:objectid": 41335, "contactdetails:name": "Nick Pope", "contactdetails:contactnumber": 9228672345}
{"lead:objectid": 4132, "contactdetails:name": "Edison Cavani", "contactdetails:contactnumber": 9228633345}
{"lead:objectid": 21335, "contactdetails:name": "James Justin", "contactdetails:contactnumber": 9838672345}
{"lead:objectid": null, "contactdetails:name": "James Maddison", "contactdetails:contactnumber": null}
{"lead:objectid": null, "contactdetails:name": "Jack Grealish", "contactdetails:contactnumber": null}
{"lead:objectid": 3214, "contactdetails:name": "Harry Kane", "contactdetails:contactnumber": null}
{"lead:objectid": 34143, "contactdetails:name": null, "contactdetails:contactnumber": null}
Any help/suggestions?
Upvotes: 4
Views: 8277
Reputation: 508
Found the fault. I have written
datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet")
It is passing a list as 'paths' in connection_options
. Only one 'path' should be supplied (not a list). It should be:
datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'path':"s3://bucket-name/output/"}, format="parquet")
Upvotes: 5