Dandelon
Dandelon

Reputation: 13

List files in directory (including file information) with Scala/Spark

I'm pretty new to Scala/Spark and I hope you guys can help me. I want to get the files which were created after a certain timestamp in a directory of a hdfs for a little monitoring in Zeppelin. Therefore I need a column with the file name , the file size and the modificationDate.

I found this is working for me to get all the information I need:

val fs = FileSystem.get(new Configuration())
val dir: String = "some/hdfs/path"
val input_files = fs.listStatus(new Path(dir)).filter(_.getModificationTime> timeInEpoch)

With the result I would like to create a DataFrame in spark with a row for each file with its information (or at least the information mentioned above)

val data = sc.parallelize(input_files) 
val dfFromData2 = spark.createDataFrame(data).toDF()

If I try it this way I get the following response:

309: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.hadoop.fs.FileStatus])
       val dfFromData2 = spark.createDataFrame(data).toDF()

I hope you can help me out :)

Greetings

Upvotes: 1

Views: 2530

Answers (1)

Charlie Flowers
Charlie Flowers

Reputation: 1380

As the error message indicates, the Hadoop FileStatus type is not a subtype of Product, i.e. a Tuple. Spark DataFrames have their own SQL-style type system which doesn't allow for arbitrary, complex types like FileStatus. Likewise, if you were to attempt an operation on the RDD you created you would receive a similar error as FileStatus is not serializable. Your best bet is to extract the data you need as a tuple or case class and create a DataFrame from that:

case class FileInfo(name : String, modifiedTime : Long, size : Long)

val df = input_files.map{x => 
           FileInfo(x.getPath.toString, x.getModificationTime, x.getLen)
        }.toSeq.toDF()

Upvotes: 2

Related Questions