yazabara
yazabara

Reputation: 1353

Spark SQL doesn't see hdfs files

I have a spark application, which running on cluster AWS EMR.

I've added file to hdfs:

javaSparkContext.addFile(filePath, recursive);

File exist on hdfs (logs available: file is readable/executeble/writable), but I can't read information from this file using spark SQL API:

 LOGGER.info("Spark working directory: " + path);
 File file = new File(path + "/test.avro");
 LOGGER.info("SPARK PATH:" + file);
 LOGGER.info("read:" + file.canRead());
 LOGGER.info("execute:" + file.canExecute());
 LOGGER.info("write:" + file.canWrite());
 Dataset<Row> load = getSparkSession()
                      .read()
                      .format(AVRO_DATA_BRICKS_LIBRARY)
                      .load(file.getAbsolutePath()); 

There is logs:

17/08/07 15:03:25 INFO SparkContext: Added file /mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/container_1502118042722_0001_01_000001/test.avro at spark://HOST:PORT/files/test.avro with timestamp 1502118205059
17/08/07 15:03:25 INFO Utils: Copying /mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/container_1502118042722_0001_01_000001/test.avro to /mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/spark-d5b494fc-2613-426f-80fc-ca66279c2194/userFiles-44aad2e8-04f4-420b-9b5e-a1ccde5db9ec/test.avro
17/08/07 15:03:25 INFO AbstractS3Calculator: Spark working directory: /mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/spark-d5b494fc-2613-426f-80fc-ca66279c2194/userFiles-44aad2e8-04f4-420b-9b5e-a1ccde5db9ec
17/08/07 15:03:25 INFO AbstractS3Calculator: SPARK PATH:/mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/spark-d5b494fc-2613-426f-80fc-ca66279c2194/userFiles-44aad2e8-04f4-420b-9b5e-a1ccde5db9ec/test.avro
17/08/07 15:03:25 INFO AbstractS3Calculator: read:true
17/08/07 15:03:25 INFO AbstractS3Calculator: execute:true
17/08/07 15:03:25 INFO AbstractS3Calculator: write:true

org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://HOST:PORT/mnt/yarn/usercache/hadoop/appcache/application_1502118042722_0001/spark-d5b494fc-2613-426f-80fc-ca66279c2194/userFiles-44aad2e8-04f4-420b-9b5e-a1ccde5db9ec/test.avro;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
    at odh.spark.services.algorithms.calculators.RiskEngineS3Calculator.getInputMembers(RiskEngineS3Calculator.java:76)
    at odh.spark.services.algorithms.calculators.RiskEngineS3Calculator.getMembersDataSets(RiskEngineS3Calculator.java:124)
    at odh.spark.services.algorithms.calculators.AbstractS3Calculator.calculate(AbstractS3Calculator.java:50)
    at odh.spark.services.ProgressSupport.start(ProgressSupport.java:47)
    at odh.spark.services.Engine.startCalculations(Engine.java:102)
    at odh.spark.services.Engine.startCalculations(Engine.java:135)
    at odh.spark.SparkApplication.main(SparkApplication.java:19)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

Upvotes: 0

Views: 597

Answers (2)

yazabara
yazabara

Reputation: 1353

By default all files stores in /user/hadoop/ folder in HDFS. (you can use this knowledge and load with this constant, but better - need to use absolute paths)

To upload to HDFS and use this files - I've used absolute paths:

new Configuration().get("fs.defaultFS")//get HDFS root
....
 FileSystem hdfs = getHdfsFileSystem();
 hdfs.copyFromLocalFile(true, true, new Path(srcLocalPath), new Path(destHdfsPath));

Where destHdfsPath - absolute path ( like 'hdfs://...../test.avro' )

And then you available to load this information from HDFS:

return getSparkSession()
                .read()
                .format(AVRO_DATA_BRICKS_LIBRARY)
                .load(absoluteFilePath);

NOTE: meybe need to add some permissions: FileUtil.chmod(hdfsDest, "u+rw,g+rw,o+rw");

Upvotes: 0

user6860682
user6860682

Reputation:

check if do you have that file in your hdfs:

hadoop fs -ls /home/spark/# or your working directory instead of /home/spark

If you have that file on hdfs, it looks like the problem on side of Spark, just follow to instruction in descriptions or update your Spark version to the latest

Upvotes: 1

Related Questions