Reputation: 109
I have two spark jobs. One is a batch job and another structured streaming job. Both are writing to the same file sink. Both have the same schema. However, while reading data from this sink, spark only reads the files created by streaming jobs and skips files created by batch jobs. I can see a directory _spark_metadata in the file sink folder. When I delete this folder spark starts reading all files. However, this will not be possible always as in the next micro-batch spark will create another _spark_metadata folder there. How to read all files from this sink in spark.
Upvotes: 0
Views: 666
Reputation: 10362
I had same & below issues. I used below code to fix these issues & It is working for me. May be below code will help you.
Issue-1
: Spark will throw below exception if you are reading data from streaming directory.
java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Issue - 2
: Spark will throw below exception if hdfs directory is empty & try to load the data from that directory. I am not passing schema while loading data, If you pass schema you might not get this issue.
org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.
Instead of pointing to HDFS directory while loading data & get required file paths & pass those paths to spark load
method.
In below code you have more control what files to read & ignore.
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
def listFiles(spark: SparkSession,path: String) = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.listFiles(new Path(path),true)
.toList.map(_.getPath)
.filter(!_.toString.contains("_spark_metadata"))
.map(_.toString)
}
val files = listFiles(spark,kafka.read.hdfsLocation)
require(files.isEmpty, s"Files are not available to process data.")
spark
.read
.format(read.format)
.options(read.options)
.load(files:_*)
Upvotes: 1