Shane
Shane

Reputation: 626

Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically

I am building a Spark Structured Streaming application where I am doing a batch-stream join. And the source for the batch data gets updated periodically.

So, I am planning to do a persist/unpersist of that batch data periodically.

Below is a sample code which I am using to persist and unpersist the batch data.

Flow:

But, I am not seeing the batch data getting refreshed for every hour.

Code:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
    .persist(StorageLevel.MEMORY_AND_DISK)
}

Is there any better way to achieve this scenario in spark structured streaming jobs ?

Upvotes: 6

Views: 5178

Answers (3)

InJung Hwang
InJung Hwang

Reputation: 31

I've tried another method w/ Apache Iceberg, in which many people have interest recently.

We can make Iceberg table w/ Apache Iceberg library, but It's rather table format replacing(upgrading) hive format. It works almost same as Delta, but it has format like Hive table, which means we can use SQL w/ Spark SQL.

I've tried the same way using Iceberg table as Delta format, refreshing static dataframe w/ rate source stream, and it works well.

I used hadoop(HDFS) based table method making Iceberg Table (but it would work as any method i guess)

Spark 3.1.2, Iceberg version 1.3.1 (It's the latest version for 3.1.2)

--packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.3.1

Besides, you should set the iceberg settings (dependent library and Iceberg options) as using Delta. In addition, hadoop_cat is my custom catalog for Iceberg table

val spark = SparkSession.builder
...
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.catalog.hadoop_cat", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_cat.type","hadoop")
  .config("spark.sql.catalog.hadoop_cat.warehouse","hdfs://../warehouse")

And refresh static dataframe as such

var staticDf = spark.sql(
s"""                                                                                                                                                                  
SELECT *                                                                                                                                                               
FROM hadoop_cat.test__db_dev.iceberg_poc_test                                                                                                           
""")

  staticDf.persist()

  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.sql(
    s"""                                                                                                                                                                  
    SELECT *                                                                                                                                                               
    FROM hadoop_cat.test__db_dev.iceberg_poc_test                                                                                                           
    """)

    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe w/ Iceberg")
  }

Upvotes: 1

InJung Hwang
InJung Hwang

Reputation: 31

Let me append the additional tips for the solution Michael Heil provided above thankfully.

I've spent quite a lot to solve this problem w/ the solution. At first, you need to use the SAME PATH for the data source. I tried to load from different data path in a batch, but it didn't work in that case. In other words,

var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(anotherDeltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

It would not work.

In addition, i found out the fact w/ Spark UI. Even without foreachBatch method, if we declare staticDF as var not val, spark refreshes data source every streaming batch.

So i tried without foreachBatch w/ Json and Delta format as well as with foreachBatch.

Data Type w/ foreachBatch w/o foreachBatch
Json O X
Delta O O
  • Json only works w/ foreachBatch (spark refreshes data source but, it does not work.)
  • Delta works even without foreachBatch as long as we declare it as var.

Upvotes: 2

Michael Heil
Michael Heil

Reputation: 18475

You could do this by making use of the streaming scheduling capabilities that Structured Streaming provides.

You can trigger the refreshing (unpersist -> load -> persist) of a static Dataframe by creating an artificial "Rate" stream that refreshes the static Dataframe periodically. The idea is to:

  1. Load the static Dataframe initially and keep as var
  2. Define a method that refreshes the static Dataframe
  3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  4. Read actual streaming data and perform join operation with static Dataframe
  5. Within that Rate Stream have a foreachBatch sink that calls refresher method created in step 2.

The following code runs fine with Spark 3.0.1, Scala 2.12.10 and Delta 0.7.0.

  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds")) // or e.g. 1 hour
    .start()

To have a full example, the delta table got created and updated with new values as below:

  val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)

Upvotes: 15

Related Questions