D. Gal
D. Gal

Reputation: 329

Spark-SQL: access file in current worker node directory

I need to read a file using spark-sql, and the file is in the current directory.

I use this command to decompress a list of files I have stored on HDFS.

val decompressCommand = Seq(laszippath, "-i", inputFileName , "-o", "out.las").!!

The file is outputted in the current worker node directory, and I know this because executing "ls -a"!! through scala I can see that the file is there. I then try to access it with the following command:

val dataFrame = sqlContext.read.las("out.las")

I assumed that the sql context would try to find the file in the current directory, but it doesn't. Also, it doesn't throw an error but a warning stating that the file could not be found (so spark continues to run).

I attempted to add the file using: sparkContext.addFile("out.las") and then access the location using: val location = SparkFiles.get("out.las") but this didn't work either.

I even ran the command val locationPt = "pwd"!! and then did val fullLocation = locationPt + "/out.las" and attempted to use that value but it didn't work either.

The actual exception that gets thrown is the following:

User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve 'x' given input columns: [];
org.apache.spark.sql.AnalysisException: cannot resolve 'x' given input columns: []

And this happens when I try to access column "x" from a dataframe. I know that column 'X' exists because I've downloaded some of the files from HDFS, decompressed them locally and ran some tests.

I need to decompress files one by one because I have 1.6TB of data and so I cannot decompress it at one go and access them later.

Can anyone tell me what I can do to access files which are being outputted to the worker node directory? Or maybe should I be doing it some other way?

Upvotes: 1

Views: 784

Answers (2)

D. Gal
D. Gal

Reputation: 329

So I managed to do it now. What I'm doing is I'm saving the file to HDFS, and then retrieving the file using the sql context through hdfs. I overwrite "out.las" each time in HDFS so that I don't have take too much space.

Upvotes: 1

uh_big_mike_boi
uh_big_mike_boi

Reputation: 3470

I have used the hadoop API before to get to files, I dunno if it will help you here.

val filePath = "/user/me/dataForHDFS/"
val fs:FileSystem = FileSystem.get(new java.net.URI(filePath + "out.las"), sc.hadoopConfiguration)

And I've not tested the below, but I'm pretty sure I'm passing the java array to scala illegally. But just giving an idea of what to do afterward.

var readIn: Array[Byte] = Array.empty[Byte]
val fileIn: FSDataInputStream = fs.open(file)
val fileIn.readFully(0, readIn)

Upvotes: 0

Related Questions