ahajib
ahajib

Reputation: 13510

Reading data from S3 using pyspark throws java.lang.NumberFormatException: For input string: "100M"

I am using the following code to read some json data from S3:

df = spark_sql_context.read.json("s3a://test_bucket/test.json")
df.show()

The above code throws the following exception:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.json.
: java.lang.NumberFormatException: For input string: "100M"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1538)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

I have read several other SO posts on this topic (like this one or this) and have done all they have mentioned but nothing seems to fix my issue.

I am using spark-2.4.4-bin-without-hadoop and hadoop-3.1.2. As for the jar files, I've got:

Also, using the following spark-submit command to run the code:

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit 
--conf spark.app.name=read_json --master yarn --deploy-mode client --num-executors 2 
--executor-cores 2 --executor-memory 2G --driver-cores 2 --driver-memory 1G 
--jars /home/my_project/jars/aws-java-sdk-bundle-1.11.199.jar,
/home/my_project/jars/hadoop-aws-3.0.0.jar,/home/my_project/jars/hadoop-common-3.0.0.jar 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.rpc.askTimeout=600s" /home/my_project/read_json.py

Anything I might be missing here?

Upvotes: 11

Views: 7128

Answers (2)

ahajib
ahajib

Reputation: 13510

I am posting what I ended up doing to fix the issue for anyone who might see the same exception:

I added hadoop-aws to HADOOP_OPTIONAL_TOOLS in hadoop-env.sh. I also removed all configurations in spark for s3a except the access/secret and everything worked. My code before the changes:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")\
       .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")\
       .set("com.amazonaws.services.s3.enableV4", "true")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.multipart.size", "104857600"
)

And after:

# Setup the Spark Process
conf = SparkConf() \
       .setAppName(app_name) \
       .set("spark.hadoop.mapred.output.compress", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "true") \
       .set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \
       .set("spark.hadoop.mapred.output.compression.`type", "BLOCK") \
       .set("spark.speculation", "false")

# Some other configs

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.access.key", s3_key
)

spark_context._jsc.hadoopConfiguration().set(
            "fs.s3a.secret.key", s3_secret
)

That probably means that it was a class path issue. The hadoop-aws wasn't getting added to the class path and so under the covers it was defaulting to some other implementation of S3AFileSystem.java. Hadoop and spark are a huge pain in this area because there are so many different places and ways to load things and java is particular about the order as well because if it doesn't happen in the right order, it will just go with whatever was loaded last. Hope this helps others facing the same issue.

Upvotes: 0

dk-na
dk-na

Reputation: 141

From the stack trace the error is thrown when it's trying to read one of the configuration options, so the issue is with one of the default configuration options that now require numeric format.

In my case the error was resolved after I added the following configuration parameter to the spark-submit command:

--conf fs.s3a.multipart.size=104857600

See Tuning S3A Uploads.

Upvotes: 1

Related Questions