Dastamn
Dastamn

Reputation: 103

How to read files from HDFS using Spark?

I have built a recommendation system using Apache Spark with datasets stored locally in my project folder, now i need to access these files from HDFS.

How can i read files from HDFS using Spark ?

This is how i initialize my Spark session:

SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local")
                .set("fs.default.name", "hdfs://localhost:54310").set("fs.defaultFS", "hdfs://localhost:54310"));
        Configuration conf = context.hadoopConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-3.1.2/etc/hadoop/core-site.xml"));
        conf.addResource(new Path("/usr/local/hadoop-3.1.2/etc/hadoop/hdfs-site.xml"));
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.fs.LocalFileSystem");
        this.session = SparkSession.builder().sparkContext(context).getOrCreate();
        System.out.println(conf.getRaw("fs.default.name"));
        System.out.println(context.getConf().get("fs.defaultFS"));

All the outputs return hdfs://localhost:54310 which is the correct uri for my HDFS.

When trying to read a file from HDFS:

session.read().option("header", true).option("inferSchema", true).csv("hdfs://localhost:54310/recommendation_system/movies/ratings.csv").cache();

I get this error:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:54310/recommendation_system/movies/ratings.csv, expected: file:///
    at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:730)
    at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86)
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:636)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
    at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:65)
    at org.apache.hadoop.fs.Globber.doGlob(Globber.java:281)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2034)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
    at scala.Option.getOrElse(Option.scala:138)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
    at scala.Option.getOrElse(Option.scala:138)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:360)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at com.dastamn.sparkml.analytics.SparkManager.<init>(SparkManager.java:36)
    at com.dastamn.sparkml.Main.main(Main.java:22)

What can i do to solve this issue ?

Upvotes: 3

Views: 13570

Answers (2)

Dastamn
Dastamn

Reputation: 103

Here's the configuration that solved the problem:

SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local[*]")
                .set("spark.hadoop.fs.default.name", "hdfs://localhost:54310").set("spark.hadoop.fs.defaultFS", "hdfs://localhost:54310")
                .set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName())
                .set("spark.hadoop.fs.hdfs.server", org.apache.hadoop.hdfs.server.namenode.NameNode.class.getName())
                .set("spark.hadoop.conf", org.apache.hadoop.hdfs.HdfsConfiguration.class.getName()));
        this.session = SparkSession.builder().sparkContext(context).getOrCreate();

Upvotes: 4

DaRkMaN
DaRkMaN

Reputation: 1054

A couple of things from the code snippet pasted:
1. When a hadoop property has to be set as part of using SparkConf, it has to be prefixed with spark.hadoop., in this case key fs.default.name needs to be set as spark.hadoop.fs.default.name and likewise for the other properties.
2. The argument to the csv function does not have to tell about the HDFS endpoint, Spark will figure it out from default properties, since it is already set.

session.read().option("header", true).option("inferSchema", true).csv("/recommendation_system/movies/ratings.csv").cache();

If the default filesystem properties are not set part of HadoopConfiguration, the complete URI isrequired for Spark/Hadoop to figure out the filesystem to use.
(Also the object name conf is not used)
3. In the above case, looks like Hadoop not was able to find a FileSystem for hdfs:// URI prefix and resorted to use the default filesystem which is local in this case(since it is using RawLocalFileSystemto process the path).
Make sure that hadoop-hdfs.jar is present in class path which has DistributedFileSystem,to instatntiate the FS objects for HDFS.

Upvotes: 4

Related Questions