Reputation: 73
Our Spark environment: DataBricks 4.2 (includes Apache Spark 2.3.1, Scala 2.11)
What we try to achieve: We want to enrich streaming data with some reference data, which is updated regularly. The enrichment is done by joining the stream with the reference data.
What we implemented:
We implemented two spark jobs (jars):
The first one is updating a Spark table TEST_TABLE
every hour (let’s call it ‘reference data’) by using
<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")
and afterwards calling spark.catalog.refreshTable("TEST_TABLE")
.
The second job (let’s call it streaming data) is using Spark Structured Streaming to stream reading some data, joining it using DataFrame.transform()
with table TEST_TABLE
and writing it to another system.
We are reading the reference data using spark.read.table(“TEST_TABLE”)
in the function called by .transform()
so we get the latest values in the table. Unfortunately, the second app crashes every time the first app updates the table. The following message is shown in Log4j output:
18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
We also tried to invalidate the cache before we read the table but that decreased the performance and the app crashed nevertheless. We suspect the root course is the lazy evaluation of the reference dataset (which still ‘points’ to the old data, which is not present anymore).
Do you have any suggestions what we could do to prevent this problem or what the best approach to join a stream with dynamic reference data is?
Upvotes: 7
Views: 2678
Reputation: 18108
Join to the reference data; do not cache it, this ensures you go to source. Look for latest version data which is signified by a primary key + a counter, where this counter closest to or equal to a counter you maintain in Streaming application. Every hour write, append all the ref data still current, again but with incremented counter; i.e. a new version. Use parquet here.
Upvotes: 6
Reputation: 668
Instead of joining the table and stream. You can take advantage of a new feature available in spark 2.3.1 i.e joining of two streams data. Create a stream instead of a table with the watermark.
Watermarks: Watermarking in Structured Streaming is a way to limit state in all
stateful streaming operations by specifying how much late data to consider.
Specifically, a watermark is a moving threshold in event-time that trails behind the
maximum event-time seen by the query in the processed data. The trailing gap (aka
watermark delay) defines how long should the engine wait for late data to arrive and
is specified in the query using withWatermark.
Upvotes: 2