Shivika
Shivika

Reputation: 229

How to write pyspark dataframe directly into S3 bucket?

I want to save pyspark dataframe directly into s3 bucket. I tried some options but getting error. Can someone help me to solve my problem? I created one sample pyspark dataframe and tried to save in S3 bucket directly.

I tried below code-

from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import last
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import col 
from pyspark.sql.functions import unix_timestamp
from functools import reduce
from pyspark.sql.session import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import max
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import abs, lit
#from __future__ import division
import sys
import mysql.connector
import traceback
import json
#from sqlalchemy import create_engine
import os
import math
import os.path
import datetime
from os import getpid
import pymysql.cursors
import time
import signal
from bs4 import BeautifulSoup
import pandas as pd
from pyspark.context import SparkConf
from collections import OrderedDict 
import multiprocessing
import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from threading import Thread
from functools import partial
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.application import MIMEApplication
from email import encoders
import smtplib
import shutil
import glob
from datetime import datetime, date
from pyspark.sql import Row



spark = SparkSession.builder.appName("app_name").getOrCreate()
print(spark.sparkContext._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion())
sc = spark.sparkContext
aws_access_key_id="*******"
aws_secret_access_key="********"
spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", aws_access_key_id)
spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", aws_secret_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain')


df = spark.createDataFrame([Row(a=1, b=4., c='GFG1', d=date(2000, 8, 1),e=datetime(2000, 8, 1, 12, 0)),
                            Row(a=2, b=8., c='GFG2', d=date(2000, 6, 2),e=datetime(2000, 6, 2, 12, 0)),
                            Row(a=4, b=5., c='GFG3', d=date(2000, 5, 3),e=datetime(2000, 5, 3, 12, 0))])

print(df.show())
print(df.printSchema())

df.write.format('csv').option('header','true').save('s3a://******/testing_s3/emp.csv',mode='overwrite')

After running this code I am getting below error-

py4j.protocol.Py4JJavaError: An error occurred while calling o48.save.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: RNKTVM6JMDACZ16W, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: MS8lToBlzqSmn1YDdq6SPh7JC6aCKSROuldEz5x9LbsnQdxhKVEQriOpJz5KkCJPBnlk4KgsCkQ=

Please tell me what are the things I am missing in my script. Thanks in advance!!

Upvotes: 0

Views: 5473

Answers (1)

yogesh garud
yogesh garud

Reputation: 336

after creating the spark context use these lines to set the credentials

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) 

or

import pyspark
conf = ( 
    pyspark.SparkConf()
     .setAppName('app_name')
     .setMaster(SPARK_MASTER)
     .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) 
     .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) 
) 
sc = pyspark.SparkContext(conf=conf) 

Upvotes: 1

Related Questions