Renan Nogueira
Renan Nogueira

Reputation: 155

How to use two AWS credentials in PySpark

I'm reading multiples files from a bucket that uses a specific credential, and I need to write these files in another bucket in other aws account.

I'm changing the aws credential in each function, but during the execution of the write function It gives me error about the reading of the function 1, as if the credential was changed but spark was still reading these files.

What could be done to solve this?

from pyspark.sql import SparkSession

def create_spark_session():

    spark = (SparkSession.builder
             .config("spark.hadoop.fs.s3a.fast.upload", True)
             .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
             .config("spark.sql.autoBroadcastJoinThreshold", -1)
             .config("spark.sql.shuffle.partitions", "1000")
             .config("spark.sql.adaptive.enabled", "true")
             .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
             .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "268435456")
             .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0")
             .enableHiveSupport().getOrCreate()
             )

    spark.sparkContext.setLogLevel("WARN")

    return spark


def read_from_bucket1(spark):
    print('Running reading bucket 1')

    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'credential_bucket_1')
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'credential_bucket_1')

    spark.read.parquet('s3a://bucket1/path/2022/*/*/').registerTempTable('temp_table')


def write_to_bucket2(spark):
    print('Running writing bucket 2')

    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'credential_bucket_2')
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'credential_bucket_2')

    (spark.sql('select cast(col as date) from temp_table')
    .write.option("compression","GZIP")
    .mode("overwrite")
    .save(f"s3a://bucket2/path_in_other_aws/"))


spark = create_spark_session()
read_from_bucket1(spark)
write_to_bucket2(spark)

Upvotes: 0

Views: 941

Answers (1)

Diegolog
Diegolog

Reputation: 348

You can specify the credentials per bucket instead of the hole file system using the info on this page . Effectivelly it should look something like this:

For each bucket:

sparkConf.set('spark.hadoop.fs.s3a.<yourbucketname>.access.key',<your-key>)
sparkConf.set('spark.hadoop.fs.s3a.<yourbucketname>.secret.key',<your-key>)

(optional but safer) I suggest creating profiles for each account in your aws client credentials file instead of using the access keys on plain code.

Upvotes: 1

Related Questions