Reputation: 103
I have built a recommendation system using Apache Spark with datasets stored locally in my project folder, now i need to access these files from HDFS.
How can i read files from HDFS using Spark ?
This is how i initialize my Spark session:
SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local")
.set("fs.default.name", "hdfs://localhost:54310").set("fs.defaultFS", "hdfs://localhost:54310"));
Configuration conf = context.hadoopConfiguration();
conf.addResource(new Path("/usr/local/hadoop-3.1.2/etc/hadoop/core-site.xml"));
conf.addResource(new Path("/usr/local/hadoop-3.1.2/etc/hadoop/hdfs-site.xml"));
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("fs.hdfs.impl", "org.apache.hadoop.fs.LocalFileSystem");
this.session = SparkSession.builder().sparkContext(context).getOrCreate();
System.out.println(conf.getRaw("fs.default.name"));
System.out.println(context.getConf().get("fs.defaultFS"));
All the outputs return hdfs://localhost:54310
which is the correct uri for my HDFS.
When trying to read a file from HDFS:
session.read().option("header", true).option("inferSchema", true).csv("hdfs://localhost:54310/recommendation_system/movies/ratings.csv").cache();
I get this error:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:54310/recommendation_system/movies/ratings.csv, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:730)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86)
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:636)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:65)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:281)
at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2034)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:361)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:360)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at com.dastamn.sparkml.analytics.SparkManager.<init>(SparkManager.java:36)
at com.dastamn.sparkml.Main.main(Main.java:22)
What can i do to solve this issue ?
Upvotes: 3
Views: 13570
Reputation: 103
Here's the configuration that solved the problem:
SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local[*]")
.set("spark.hadoop.fs.default.name", "hdfs://localhost:54310").set("spark.hadoop.fs.defaultFS", "hdfs://localhost:54310")
.set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName())
.set("spark.hadoop.fs.hdfs.server", org.apache.hadoop.hdfs.server.namenode.NameNode.class.getName())
.set("spark.hadoop.conf", org.apache.hadoop.hdfs.HdfsConfiguration.class.getName()));
this.session = SparkSession.builder().sparkContext(context).getOrCreate();
Upvotes: 4
Reputation: 1054
A couple of things from the code snippet pasted:
1. When a hadoop property has to be set as part of using SparkConf
, it has to be prefixed with spark.hadoop.
, in this case key fs.default.name
needs to be set as spark.hadoop.fs.default.name
and likewise for the other properties.
2. The argument to the csv
function does not have to tell about the HDFS endpoint, Spark will figure it out from default properties, since it is already set.
session.read().option("header", true).option("inferSchema", true).csv("/recommendation_system/movies/ratings.csv").cache();
If the default filesystem properties are not set part of HadoopConfiguration, the complete URI isrequired for Spark/Hadoop to figure out the filesystem to use.
(Also the object name conf
is not used)
3. In the above case, looks like Hadoop not was able to find a FileSystem for hdfs://
URI prefix and resorted to use the default filesystem which is local
in this case(since it is using RawLocalFileSystem
to process the path).
Make sure that hadoop-hdfs.jar
is present in class path which has DistributedFileSystem
,to instatntiate the FS objects for HDFS.
Upvotes: 4