Reputation: 13
I'm pretty new to Scala/Spark and I hope you guys can help me. I want to get the files which were created after a certain timestamp in a directory of a hdfs for a little monitoring in Zeppelin. Therefore I need a column with the file name , the file size and the modificationDate.
I found this is working for me to get all the information I need:
val fs = FileSystem.get(new Configuration())
val dir: String = "some/hdfs/path"
val input_files = fs.listStatus(new Path(dir)).filter(_.getModificationTime> timeInEpoch)
With the result I would like to create a DataFrame in spark with a row for each file with its information (or at least the information mentioned above)
val data = sc.parallelize(input_files)
val dfFromData2 = spark.createDataFrame(data).toDF()
If I try it this way I get the following response:
309: error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.hadoop.fs.FileStatus])
val dfFromData2 = spark.createDataFrame(data).toDF()
I hope you can help me out :)
Greetings
Upvotes: 1
Views: 2530
Reputation: 1380
As the error message indicates, the Hadoop FileStatus
type is not a subtype of Product
, i.e. a Tuple. Spark DataFrames have their own SQL-style type system which doesn't allow for arbitrary, complex types like FileStatus
. Likewise, if you were to attempt an operation on the RDD you created you would receive a similar error as FileStatus
is not serializable. Your best bet is to extract the data you need as a tuple or case class and create a DataFrame from that:
case class FileInfo(name : String, modifiedTime : Long, size : Long)
val df = input_files.map{x =>
FileInfo(x.getPath.toString, x.getModificationTime, x.getLen)
}.toSeq.toDF()
Upvotes: 2