Reputation: 4776
I have many parquet files inside a S3 folder. Each one has 'A', 'B', 'C' Columns. 'A' and 'B' columns have string data type but 'C' column has Float
type in some and Double
in others. I want to merge these parquet files and create larger one. I am doing this merge inside AWS Glue using pyspark.
When I try to write the Dataframe into S3 using
output_s3_path = "s3://new_path/"
df.write.mode("overwrite").parquet(output_s3_path)
I am getting error
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableFloat cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableDouble at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setDouble(SpecificInternalRow.scala:284)
I tried with following ways
spark.read.option("mergeSchema", "true")
to merge the schema, ex:
s3_df = spark.read.option("overwriteSchema", "true").parquet("s3://path/")
spark.read.option("overwriteSchema", "true")
to overite the schema with new data type
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
But not of them worked. Setting the schema expilicitely would not a better solution for me as I expect to merge many different parquet file sets using the same solution.
How can I solve this issue?
Upvotes: 1
Views: 632
Reputation: 112
try using df = df.withColumn("C", F.col("C").cast("double"))
in pyspark.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
# Initialize Glue context and job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3 paths
input_path = "s3://your-input-bucket/path-to-parquet-files/"
output_path = "s3://your-output-bucket/path-to-merged-parquet-file/"
# Read the parquet files into a DynamicFrame
dyf = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [input_path]},
format="parquet"
)
# Convert DynamicFrame to DataFrame
df = dyf.toDF()
# Cast the 'C' column to Double to ensure consistency
df = df.withColumn("C", F.col("C").cast("double"))
# Convert DataFrame back to DynamicFrame
dyf_cleaned = DynamicFrame.fromDF(df, glueContext, "dyf_cleaned")
# Write the merged data back to S3
glueContext.write_dynamic_frame.from_options(
frame=dyf_cleaned,
connection_type="s3",
connection_options={"path": output_path},
format="parquet"
)
# Commit the job
job.commit()
Upvotes: 0