Reputation: 3944
Our application processes live streaming data, which is written to parquet files. Every so often we start a new parquet file, but since there are updates every second or so, and the data needs to be able to be searched immediately as it comes in, we are constantly updating the "current" parquet file. We are making these updates in an atomic manner (generating a new parquet file with the existing data plus the new data to a temporary filename, and then renaming the file via an atomic OS call to the existing file's filename).
The problem is that if we are doing searches over the above described "semi-live" file, we are getting errors.
Not that it likely matters, but the file is being written via AvroBasedParquetWriter.write()
The read is being done via a call to SparkSession.read.parquet(path) We then turn the dataframe into a dataset and do a count on it. Doing this throws the following exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1699.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1699.0 (TID 2802, localhost, executor driver): java.io.IOException: Could not read footer for file: FileStatus{path=; isDirectory=false; length=418280; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
My suspicion is that the way that the read is happening isn't atomic. Like maybe we are replacing the parquet file while the call to SparkSession.read.parquet() is actively reading it.
Is this read long-lived / non-atomic?
If so, would it be possible to lock the parquet file (via Scala/Java) in such a way that the call to SparkSession.read.parquet() would play nice (i.e. gracefully wait for me to release the lock before attempting to read from it)?
Upvotes: 0
Views: 3866
Reputation: 18098
See https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html. No such approach as yours, whereby they indicate concurrent writing and reading being possible. Old version of Spark as well! Adopt their approach.
Upvotes: 0
Reputation: 3115
I'm not an expert of Spark SQL, but from a Parquet and Hive perspective, I see two separate issues in the scenario you describe:
Parquet is not fit for streaming usage. Avro or textfile is much better for that purpose, but they are not as efficient as Parquet, so the usual solution is to mix a row-oriented format used for short term with a column-oriented one used for the long term. In Hive, this is possible by streaming new data into a separate partition using the Avro or textfile format while the rest of the partitions are stored as Parquet. (I'm not sure whether Spark supports such a mixed scenario.)
From time to time, streamed data needs to be compacted. In the scenario you describe, this happens after every write, but it is more typical to do this at some fixed time interval (for example hourly or daily) and let the new data reside in a sub-optimal format in-between. Unfortunately, this is more complicated in practice, because without some extra abstraction layer, compaction is not atomic, and as a result for brief periods the compacted data either disappears or gets duplicated. The solution is to use some additional logic to ensure atomicity, like Hive ACID or Apache Iceberg (incubating). If I remember correctly, the latter has a Spark binding, but I can't find a link to it.
Upvotes: 1