Reputation: 477
I am trying to access gzip files from AWS S3 using Spark. I have a very simple script below. I first started off with a IAM user with access permissions to the S3 bucket. Then I created an EC2 instance & installed Python & Spark. I setup the spark.properties file as below. I only copied the jar files, didn't bother to go through the entire Hadoop installation. Then I realized I have to create an IAM role for EC2 instances to access S3. So, I created an IAM role, attached an access policy and then attached the role to EC2. Did not restart EC2 instance. What am I doing wrong? My goal is to get comfortable with Pyspark on a standalone environment before I proceed to EMR, clusters, etc.
I execute Pyspark as:
enter code here
spark-submit --properties-file spark.properties S3Access.py
My Pyspark code:
import os.path
from pathlib import Path
from pyspark import SparkContext, SparkConf
from boto3.session import Session
ACCESS_KEY = 'blah blah'
SECRET_KEY = 'blah blah'
BUCKET_NAME = 'bucket'
PREFIX = 'folder-name/'
MAX_FILES_READ = 3
if __name__ == "__main__":
# Use Boto to connect to S3 and get a list of objects from a bucket
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
s3 = session.resource('s3')
# call S3 to list current buckets
my_bucket = s3.Bucket(BUCKET_NAME)
# Get a Spark context and use it to parallelize the keys
conf = SparkConf().setAppName("MyFirstProcessingApp")
sc = SparkContext(conf=conf)
index = 0
for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
if 'gz' in s3_file.key:
index += 1
print ("Found file: ", s3_file.key)
if index == MAX_FILES_READ:
break
fileLocation = "s3a://" + BUCKET_NAME + '/path-to-file/path/filename.txt'
print ("file location: ", fileLocation)
s3File = sc.textFile(fileLocation)
count = s3File.count()
Error I get:
ubuntu@ip-172-31-57-35:/opt/iqmedia$ spark-submit --properties-file spark.properties S3Access.py
19/07/22 01:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found file: inscape/content/2019-01-01/2019-01-01-07.0000_part_00.gz
file location: s3a://bucket/folder/filename.txt
Traceback (most recent call last):
File "/opt/iqmedia/S3Access.py", line 42, in <module>
count = s3File.count()
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 35CB499B1AE1A8A6, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: BHnH2DO+HuvARy9d3hdfCrtc2ToSJ7DQ/6ODSymLfDOZF7G80rpJqyyvkVuXdAPsR2a9gjqxWX8=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
What am I doing wrong? Any help is appreciated. Thanks
Updated July 22: I did everything like mentioned except running Notebook. I ran the script again and got the following error. Any thoughts?
File "/opt/iqmedia/S3Access.py", line 39, in <module>
print(s3File.count())
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/opt/apache-spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Updated July 22 evening: Well, I got past everything. I think I was using the wrong Hadoop jar file. My bashrc looks like this now even though I don't have Notebook working.
export JAVA_HOME=/usr
export SPARK_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export HADOOP_HOME=/opt/apache-spark/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:/home/ubuntu/anaconda3/bin:$HADOOP_HOME/bin
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYSPARK_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON=python2.7
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
Now, I am able to run a simple Spark command to measure the size of a file. But I am still getting the error below.
NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Any thoughts?
Upvotes: 2
Views: 10064
Reputation: 511
NetRocks You did not say what your platform was, but here is a guide on pyspark installation on Windows and AWS S3 access configuration.
Judging by the error you are getting, you are missing hadoop-aws.jar
.
Upvotes: 0
Reputation: 773
Following the @Bitswazsky's answer & using below
import os
import os.path
import findspark
findspark.init()
from pathlib import Path
from pyspark.sql import SparkSession
from boto3.session import Session
AWS_ACCESS_KEY_ID="ASI...XHMRZ"
AWS_SECRET_ACCESS_KEY="1FCz...M4pWXUcP"
AWS_SESSION_TOKEN="IQoJb3JpZ2l.....ZOBs7pIzzy7QXc0UAhyAYLtUvzHcB1CO98Jwk3PtGNkS4baykkUssDxOkvp01U/8F7pu6Vog=="
BUCKET_NAME = 'my-sandbox'
profile_name='3716-Developer'
PREFIX = 'root'
MAX_FILES_READ = 3
# Use Boto to connect to S3 and get a list of objects from a bucket
session = Session(aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
s3 = session.resource('s3')
# call S3 to list current buckets
my_bucket = s3.Bucket(BUCKET_NAME)
spark = SparkSession.builder.appName('MyFirstProcessingApp').master('local[2]').getOrCreate()
sc = spark.sparkContext
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
df = spark.read.format("parquet").load("s3a://my-sandbox/tmp/cust_df_f_zip.parquet"
On Windows WSL $ uname -a Linux DESKTOP-3DKM78D 4.19.128-microsoft-standard #1 SMP Tue Jun 23 12:58:10 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux ____ __ / / ___ / / \ / _ / _ `/ __/ '/ / / .__/_,// //_\ version 2.4.7
Traceback (most recent call last):
File "spark_s3_access.py", line 31, in <module>
df = spark.read.format("parquet").load("s3a://my-sandbox/tmp/cust_df_f_zip.parquet")
File "/opt/spark/python/pyspark/sql/readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o38.load.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 9B95DBA6D53DA18A, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: //lgkjl8/vZMxVqfX7jc1wfFEaXbUH+XuAjolL4SnCzATl1UzylUcoWe8cH4LggvDMza4cXdATs=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
on the last line reading parquet: df = spark.read...
(cor_poc) syoon@DESKTOP-$ echo $HADOOP_HOME
/opt/spark
(cor_poc) syoon@DESKTOP$ echo $SPARK_HOME
/opt/spark
(cor_poc) syoon@DESKTOP$ echo $PATH
/home/syoon/.nvm/versions/node/v14.14.0/bin:...:/usr/lib/jvm/java-15-oracle/db/bin:/opt/spark/bin
(cor_poc) syoon@DESKTOP-3DKM78D:~/Prj/processing_rqsts/paul_df1_01$ tail /opt/spark/conf/spark-defaults.conf
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.driver.extraClassPath :/opt/spark/jars/hadoop-aws-2.7.3.jar:/opt/spark/jars/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath :/opt/spark/jars/hadoop-aws-2.7.3.jar:/opt/spark/jars/aws-java-sdk-1.7.4.jar
but I do have the permission:
$ aws s3 ls s3://my-sandbox/tmp/
2020-11-02 13:51:36 0
2020-11-02 20:52:46 18283514 cust_df.parquet
Any help would be greatly appreciated!
Upvotes: 0
Reputation: 51
Fixing the HADOOP_HOME as later tried by @NetRocks helped to solve my case. If you are using the Spark (Hadoop bundled version) then make sure you set the SPARK_HOME and HADOOP_HOME to the same directory
Upvotes: 0
Reputation: 4698
I'd suggest you go via this route that I'm mentioning below, because I've faced issues with s3 and pyspark in the past, and whatever I did wasn't good for my head, or for the wall.
~/.bash_profile
. Should be similar for other OSs./users/me/test-spark
). One thing to remember here is that if you use any other version, you'll be in agony.spark-defaults.conf
file which you can find inside the spark installation path:spark.driver.extraClassPath :/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath :/users/me/test-spark/hadoop-aws-2.7.3.jar:/users/me/test-spark/aws-java-sdk-1.7.4.jar
~/.bash_profile
(PYTHONPATH
, PYSPARK_PYTHON
, PYSPARK_DRIVER_PYTHON
, PYSPARK_DRIVER_PYTHON_OPTS
). You can find tutorials online that show how to set these values.Once you've done all these prerequisites you can move to next stage:
import os.path
from pathlib import Path
from pyspark.sql import SparkSession
from boto3.session import Session
ACCESS_KEY = 'blah blah blah?'
SECRET_KEY = 'blah blah blah!'
BUCKET_NAME = 'my-leaky-bucket'
PREFIX = 'root'
MAX_FILES_READ = 3
# Use Boto to connect to S3 and get a list of objects from a bucket
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
s3 = session.resource('s3')
# call S3 to list current buckets
my_bucket = s3.Bucket(BUCKET_NAME)
spark = SparkSession.builder.appName('MyFirstProcessingApp').master('local[2]').getOrCreate()
sc = spark.sparkContext
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
index = 0
for s3_file in my_bucket.objects.filter(Prefix=PREFIX):
if 'gz' in s3_file.key:
index += 1
print ("Found file: {file}".format(file=s3_file.key))
if index == MAX_FILES_READ:
break
fileLocation = "s3a://{bucket}/{file}".format(bucket=BUCKET_NAME,file=s3_file.key)
print ("file location: {loc}".format(loc=fileLocation))
s3File = sc.textFile(fileLocation)
print(s3File.count())
print('\n')
Additional Note: This is kinda amazing that if you create an EMR Cluster and attach a jupyter notebook to it (from AWS Web-UI), it takes care of everything. You can simply copy-paste the code snippet in that Jupyter notebook, and you're good to go.
Upvotes: 3