Prateek Jain
Prateek Jain

Reputation: 196

Writing a Spark Dataframe as an Apache Hudi table to a S3 Bucket which has Object Lock

I have some datasets (CSV files and Parquet files) which I want to transform and build as a Hudi Table in my S3 Bucket which has Object Lock enabled.

From the official documentation of PySpark I understand that it currently doesn't officially support Content-MD5 for S3 with Object Lock, but I am wondering if someone can help me with this since this might be a useful case for larger organisations where S3 buckets are always on Object Lock according to policy. Note: I do not have an option to create a bucket without Object Lock.

I have already written the following code:

from methods.fast_hudi import FastHudi
import os
from pyspark.sql.functions import concat_ws, md5
import zipfile import hashlib

with zipfile.ZipFile(r'./data/package.zip', 'r') as zip_ref: zip_ref.extractall(r'./data/package')

os.environ['AWS_ACCESS_KEY'] = '' os.environ['AWS_SECRET_ACCESS_KEY'] = '' os.environ['AWS_SESSION_TOKEN'] = ''

spark_config = { 'spark.jars.packages':'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1,org.apache.spark:spark-avro_2.13:3.4.0,org.apache.calcite:calcite-core:1.34.0,com.amazonaws:aws-java-sdk-bundle:1.12.486', "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", 'spark.hadoop.fs.s3a.aws.credentials.provider':'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider', "spark.hadoop.fs.s3a.impl":"org.apache.hadoop.fs.s3a.S3AFileSystem", 'spark.hadoop.fs.s3a.access.key': os.getenv('AWS_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.secret.key': os.getenv('AWS_SECRET_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.session.token': os.getenv('AWS_SESSION_TOKEN', None), 'spark.hadoop.fs.s3a.aws.credentials.provider': "com.amazonaws.auth.profile.ProfileCredentialsProvider", 'spark.driver.memory': '16g', 'spark.hadoop.fs.s3a.fast.upload': 'true', 'spark.hadoop.fs.s3a.upload.buffer': 'bytebuffer' }

spark = FastHudi('Showroom Variants Bronze Layer - Hudi', spark_config) base_data = spark.read( 'csv', 'data/package/showroom_variant.csv' )

base_data.show()

hudi_table_name = 'showroom_variants'

hoodie_options = { 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'showroom_variant_id', 'hoodie.datasource.write.precombine.field': 'global_brand_name', 'hoodie.datasource.write.partitionpath.field': 'global_sales_parent_name,global_brand_name,body_type', 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.table.name': hudi_table_name, 'hoodie.deltastreamer.source.dfs.listing.max.fileid': '-1', 'hoodie.datasource.write.hive_style_partitioning': 'true' }

spark.write_as_hudi( base_data, 'overwrite', 'org.apache.hudi', f's3a://<s3_bucket>/datalake/{hudi_table_name}', hoodie_options)

Here is are the classes used in the imports:

from pyspark.sql import SparkSession
from functools import reduce

class FastSpark:
    def init(self, app_name: str, spark_config: dict = None) -> None:
        self.spark_obj = None self.app_name = app_name self.spark_config = spark_config

    def get_or_create_session(self) -> SparkSession:

        if self.spark_config is None:
            return SparkSession.builder \
                .appName(self.app_name) \
                .getOrCreate()
        else:
            spark_session = reduce(lambda x, y: x.config(y[0], y[1]),self.spark_config.items(),SparkSession.builder.appName(self.app_name))

            return spark_session.getOrCreate()

    def read(self, read_format: str, load_path: str, read_options: dict = None):
        self.spark_obj = self.get_or_create_session()
        if read_options is None:
            return self.spark_obj.read.format(read_format) \
                .load(load_path)
        else:
            read_obj = reduce(lambda x, y: x.option(y[0], y[1]), read_options.items(),
                              self.spark_obj.read.format(read_format))
            return read_obj.load(load_path)
from methods.fast_spark import FastSpark
import os
from functools import reduce

class FastHudi(FastSpark):

    def __init__(self, app_name: str, spark_config: dict = None) -> None:
        super().__init__(app_name, spark_config)
        self.AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', None)
        self.AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', None)
        self.spark = self.get_or_create_session()

        if self.AWS_ACCESS_KEY is None or self.AWS_SECRET_ACCESS_KEY is None:
               raise ValueError("Either of the following variables need to be set: AWS_ACCESS_KEY,AWS_SECRET_ACCESS_KEY")

    def write_as_hudi(self, write_df, write_mode: str, write_format: str = 'org.apache.hudi', target_path: str = None,hoodie_options: dict = None, header=None):

        write_df.write.format(write_format) \
            .options(**hoodie_options) \
            .mode(write_mode) \
            .save(target_path, header=header)

    def stop_spark(self):
        self.spark.stop()

Library Version: pyspark == 3.4.0

When I am trying to run the above code, it throws this error:

py4j.protocol.Py4JJavaError: An error occurred while calling o121.save. : org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on datalake/showroom_variants: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <request_id>; Proxy: null), S3 Extended Request ID: <some_id>:InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <extended_request_id>; Proxy: null)

at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:119)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:318)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:293)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:4532)
at org.apache.hadoop.fs.s3a.S3AFileSystem.access$1900(S3AFileSystem.java:259)
at org.apache.hadoop.fs.s3a.S3AFileSystem$MkdirOperationCallbacksImpl.createFakeDirectory(S3AFileSystem.java:3461)
at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:121)
at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:45)
at org.apache.hadoop.fs.s3a.impl.ExecutingStoreOperation.apply(ExecutingStoreOperation.java:76)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:3428)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2388)
at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:481)
at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1201)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:323)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Erro\r Code: InvalidRequest; Request ID: DKTYYJHRJGJZFQ1X; S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI=; Proxy: null), S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5470)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5417)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:422)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6551)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1862)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1822)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2877)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2874)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$32(S3AFileSystem.java:4534)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117) ... 63 more \

EDIT (2024-02-18):

I am open to workarounds for this or maybe considering a different approach/framework than Apache Hudi as well. The goal is to create efficient data lakes as I could have with Apache Hudi, but the Amazon S3 Object Lock seems to be a blocker in this case. Open to suggestions.

Upvotes: 0

Views: 675

Answers (2)

parisni
parisni

Reputation: 1152

You might give a try to apache iceberg which does not rely on hadoop file system to deal with s3 but on latest aws SDK.

While the documentation does not mention object lock it might work or the implementation should be more strait forward than hadoop.

Upvotes: 0

stevel
stevel

Reputation: 13430

S3A codebase has never been tested on an object lock bucket, as far as I'm aware. File a HADOOP JIRA with the stack trace. Don't expect any immediate response, and anything that comes in will be for hadoop-3.4.x only... Which there is not yet any spark releases.

More likely: we'll use the error message in a documentation with "not currently supported"

Update: created the JIRA, https://issues.apache.org/jira/browse/HADOOP-19080 S3A to support writing to object lock buckets.

Upvotes: 0

Related Questions