MasterHamster
MasterHamster

Reputation: 21

Accessing S3 using S3a protocol from Spark Using Hadoop version 2.7.2

I’m trying to access s3 (s3a protocol) from pyspark (version 2.2.0) and I’m having some difficulty.

I’m using the Hadoop and AWS sdk packages.

pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2

Here is what my code looks like:

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

rdd = sc.textFile('s3a://spark-test-project/large-file.csv')
print(rdd.first().show())

I get this:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1361, in first
    rs = self.take(1)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 1313, in take
    totalParts = self.getNumPartitions()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/rdd.py", line 385, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/attazadeh/DataEngine/env/lib/python3.4/site-packages/pyspark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.partitions.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 32750D3DED4067BD, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: jAhO0tWTblPEUehF1Bul9WZj/9G7woaHFVxb8gzsOpekam82V/Rm9zLgdLDNsGZ6mPizGZmo6xI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Is this a bug with the AWS Java SDK? I’m new to spark, so I don’t know if there a way to get better logging information from AWS other than AWS Error Code: null

Upvotes: 2

Views: 7082

Answers (3)

Manish
Manish

Reputation: 11

I was trying to read a parquet file from s3 using pyspark and this worked for me.

from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.config('spark.master', 'local')\
.config('spark.app.name', 's3app')\
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4')\
.getOrCreate()

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'access-key')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'secret-key')

df = spark.read.format('parquet').load('s3a://path-to-s3')
df.show()

Upvotes: 0

stevel
stevel

Reputation: 13480

"Bad request" is the message to fear from S3, it means "This didn't work and we won't tell you why".

There's a whole section on troubleshooting S3A in the docs.

If your bucket is hosted someone which only supports the S3 "v4" auth protocol (frankfurt, london, seoul) then you need to set the fs.s3a.endpoint field to that of the specific region ... the doc has details.

Otherwise, try using s3a://landsat-pds/scene_list.gz as a source. It's a public CSV File which doesn't need authentication. If you can't see it, then you are in serious trouble

Upvotes: 0

Bob Swain
Bob Swain

Reputation: 3182

For what it's worth, I have this line in my spark-defaults.conf file on aws:

spark.jars.packages com.amazonaws:aws-java-sdk:1.11.99,org.apache.hadoop:hadoop-aws:2.7.2

I also made sure that the security group I'm using when setting up my EC2 has access to s3.

After those two things, I've had no issues reading files from s3:

%pyspark
df = spark.read.csv("s3a://my_bucket/name/")

Alternatively, if you use AWS EMR, you should be able to access s3 right out of the box:

%pyspark
df = spark.read.csv("s3://my_bucket/name/")

Upvotes: 0

Related Questions