Reputation: 21
I’m trying to access s3 (s3a protocol) from pyspark (version 2.2.0) and I’m having some difficulty.
I’m using the Hadoop and AWS sdk packages.
pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2
Here is what my code looks like:
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
rdd = sc.textFile('s3a://spark-test-project/large-file.csv')
print(rdd.first().show())
I get this:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1361, in first
rs = self.take(1)
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1313, in take
totalParts = self.getNumPartitions()
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
return self._jrdd.partitions().size()
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.partitions.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 32750D3DED4067BD, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: jAhO0tWTblPEUehF1Bul9WZj/9G7woaHFVxb8gzsOpekam82V/Rm9zLgdLDNsGZ6mPizGZmo6xI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Is this a bug with the AWS Java SDK? I’m new to spark, so I don’t know if there a way to get better logging information from AWS other than AWS Error Code: null
Upvotes: 2
Views: 7082
Reputation: 11
I was trying to read a parquet file from s3 using pyspark and this worked for me.
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.config('spark.master', 'local')\
.config('spark.app.name', 's3app')\
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4')\
.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'access-key')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'secret-key')
df = spark.read.format('parquet').load('s3a://path-to-s3')
df.show()
Upvotes: 0
Reputation: 13480
"Bad request" is the message to fear from S3, it means "This didn't work and we won't tell you why".
There's a whole section on troubleshooting S3A in the docs.
If your bucket is hosted someone which only supports the S3 "v4" auth protocol (frankfurt, london, seoul) then you need to set the fs.s3a.endpoint field to that of the specific region ... the doc has details.
Otherwise, try using s3a://landsat-pds/scene_list.gz
as a source. It's a public CSV File which doesn't need authentication. If you can't see it, then you are in serious trouble
Upvotes: 0
Reputation: 3182
For what it's worth, I have this line in my spark-defaults.conf file on aws:
spark.jars.packages com.amazonaws:aws-java-sdk:1.11.99,org.apache.hadoop:hadoop-aws:2.7.2
I also made sure that the security group I'm using when setting up my EC2 has access to s3.
After those two things, I've had no issues reading files from s3:
%pyspark
df = spark.read.csv("s3a://my_bucket/name/")
Alternatively, if you use AWS EMR, you should be able to access s3 right out of the box:
%pyspark
df = spark.read.csv("s3://my_bucket/name/")
Upvotes: 0