Reputation: 626
I am building a Spark Structured Streaming application where I am doing a batch-stream join. And the source for the batch data gets updated periodically.
So, I am planning to do a persist/unpersist of that batch data periodically.
Below is a sample code which I am using to persist and unpersist the batch data.
Flow:
But, I am not seeing the batch data getting refreshed for every hour.
Code:
var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()
if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
refreshedTime = Instant.now()
batchDF.unpersist(false)
batchDF = handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
Is there any better way to achieve this scenario in spark structured streaming jobs ?
Upvotes: 6
Views: 5178
Reputation: 31
I've tried another method w/ Apache Iceberg, in which many people have interest recently.
We can make Iceberg table w/ Apache Iceberg library, but It's rather table format replacing(upgrading) hive format. It works almost same as Delta, but it has format like Hive table, which means we can use SQL w/ Spark SQL.
I've tried the same way using Iceberg table as Delta format, refreshing static dataframe w/ rate source stream, and it works well.
I used hadoop(HDFS) based table method making Iceberg Table (but it would work as any method i guess)
Spark 3.1.2, Iceberg version 1.3.1 (It's the latest version for 3.1.2)
--packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.3.1
Besides, you should set the iceberg settings (dependent library and Iceberg options) as using Delta.
In addition, hadoop_cat
is my custom catalog for Iceberg table
val spark = SparkSession.builder
...
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.hadoop_cat", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_cat.type","hadoop")
.config("spark.sql.catalog.hadoop_cat.warehouse","hdfs://../warehouse")
And refresh static dataframe as such
var staticDf = spark.sql(
s"""
SELECT *
FROM hadoop_cat.test__db_dev.iceberg_poc_test
""")
staticDf.persist()
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.sql(
s"""
SELECT *
FROM hadoop_cat.test__db_dev.iceberg_poc_test
""")
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe w/ Iceberg")
}
Upvotes: 1
Reputation: 31
Let me append the additional tips for the solution Michael Heil provided above thankfully.
I've spent quite a lot to solve this problem w/ the solution. At first, you need to use the SAME PATH for the data source. I tried to load from different data path in a batch, but it didn't work in that case. In other words,
var staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.read.format("delta").load(anotherDeltaPath)
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
}
It would not work.
In addition, i found out the fact w/ Spark UI.
Even without foreachBatch
method, if we declare staticDF as var
not val
, spark refreshes data source every streaming batch.
So i tried without foreachBatch w/ Json and Delta format as well as with foreachBatch.
Data Type | w/ foreachBatch | w/o foreachBatch |
---|---|---|
Json | O | X |
Delta | O | O |
var
.Upvotes: 2
Reputation: 18475
You could do this by making use of the streaming scheduling capabilities that Structured Streaming provides.
You can trigger the refreshing (unpersist -> load -> persist) of a static Dataframe by creating an artificial "Rate" stream that refreshes the static Dataframe periodically. The idea is to:
var
foreachBatch
sink that calls refresher method created in step 2.The following code runs fine with Spark 3.0.1, Scala 2.12.10 and Delta 0.7.0.
// 1. Load the staticDataframe initially and keep as `var`
var staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
// 2. Define a method that refreshes the static Dataframe
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
}
// 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
val staticRefreshStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
.selectExpr("CAST(value as LONG) as trigger")
.as[Long]
// 4. Read actual streaming data and perform join operation with static Dataframe
// As an example I used Kafka as a streaming source
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
val joinDf = streamingDf.join(staticDf, "id")
val query = joinDf.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/path/to/sparkCheckpoint")
.start()
// 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
staticRefreshStream.writeStream
.outputMode("append")
.foreachBatch(foreachBatchMethod[Long] _)
.queryName("RefreshStream")
.trigger(Trigger.ProcessingTime("5 seconds")) // or e.g. 1 hour
.start()
To have a full example, the delta table got created and updated with new values as below:
val deltaPath = "file:///tmp/delta/table"
import spark.implicits._
val df = Seq(
(1L, "static1"),
(2L, "static2")
).toDF("id", "deltaField")
df.write
.mode(SaveMode.Overwrite)
.format("delta")
.save(deltaPath)
Upvotes: 15