J.C Guzman
J.C Guzman

Reputation: 1334

Error when read csv with pyspark from AWS s3 Bucket

I am working with a docker-compose file running a spark-cluster in it. SPARK_VERSION="3.0.0" HADOOP_VERSION="3.2"

all the files can be found in the following Github link: https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker

I am trying to read a csv file from aws s3 with the following code:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.874,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType, LongType,StructType


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("pyspark-2")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

AWS_ACCESS_KEY_ID = "AKIAXJFEEOGKW4RSZG3B"
AWS_SECRET_ACCESS_KEY = "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"

sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

df = spark.read.csv("s3a://<PATH>)

But I receive the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-24141d88d3e8> in <module>
----> 1 df = spark.read.csv("s3a://migrationawsbucket/allevents.csv")

/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in _call_(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.csv.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StreamCapabilities
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 41 more

I have tried the different solutions that I found in this link: How can I read from S3 in pyspark running in local mode?

---UPDATE-----

I have tried change the aws-java-sdk-bundle and hadoop-aws as stevel suggested, with different versions found in: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

I have tried to use: hadoop-aws-3.2.0.jar and aws-java-sdk-bundle-1.11.874.jar following the answer found here: AWS EKS Spark 3.0, Hadoop 3.2 Error - NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException

But same error

Upvotes: 0

Views: 1225

Answers (1)

stevel
stevel

Reputation: 13460

Fundamental mismatch between hadoop-aws JAR and aws-sdk. Afraid they don't mix and match.

Use mvnrepo to work out the exact version you need for the hadoop-* artifacts spark has. And don't think about mixing hadoop-* artifact versions as they are as tightly integrated as the spark JARs are

I'd recommend you upgrade to a version of spark with hadoop-3.2.x or later artifacts, which are compatible with recent 1.11 AWS SDKs.

Upvotes: 1

Related Questions