BdEngineer
BdEngineer

Reputation: 3209

How to refresh loaded dataframe contents in spark streaming?

Using spark-sql 2.4.1 and kafka for real time streaming. I have following use case

  1. Need to load a meta-data from hdfs for joining with streaming dataframe from kafka.
  2. streaming data record's particular columns should be looked up in meta-data dataframe particular colums(col-X) data. If found pick meta-data column(col-Y) data Else not found , insert streaming record/column data into meta-data dataframe i.e. into hdfs. I.e. it should be looked up if streaming dataframe contain same data again.

As meta-data loaded at the beginning of the spark job how to refresh its contents again in the streaming-job to lookup and join with another streaming dataframe ?

Upvotes: 1

Views: 2996

Answers (3)

Below is the scenario which I followed in spark 2.4.5 for left outer join with stream join.Below process is pushing spark to read latest dimension data changes.

Process is for Stream Join with batch dimension (always update)

Step 1:-

Before starting Spark streaming job:- Make sure dimension batch data folder has only one file and the file should have at-least one record (for some reason placing empty file is not working).

Step 2:- Start your streaming job and add a stream record in kafka stream

Step 3:- Overwrite dim data with values (the file should be same name don't change and the dimension folder should have only one file) Note:- don't use spark to write to this folder use Java or Scala filesystem.io to overwrite the file or bash delete the file and replace with new data file with same name.

Step 4:- In next batch spark is able to read updated dimension data while joining with kafka stream...

Sample Code:-

package com.broccoli.streaming.streamjoinupdate

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}

object BroadCastStreamJoin3 {

  def main(args: Array[String]): Unit = {
    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")


    val dimDf3 = spark.read
      .schema(schemaUntyped2)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/dimension")
      .withColumnRenamed("id", "id_2")
      .withColumnRenamed("countrycode", "countrycode_2")

    import spark.implicits._

    factDf1
      .join(
        dimDf3,
        $"countrycode_2" <=> $"countrycode",
        "inner"
      )
      .writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination

  }
}

Thanks Sri

Upvotes: 0

Jacek Laskowski
Jacek Laskowski

Reputation: 74769

I may have misunderstood the question, but refreshing the metadata dataframe should be a feature supported out of the box.

You simply don't have to do anything.

Let's have a look at the example:

// a batch dataframe
val metadata = spark.read.text("metadata.txt")
scala> metadata.show
+-----+
|value|
+-----+
|hello|
+-----+

// a streaming dataframe
val stream = spark.readStream.text("so")

// join on the only value column
stream.join(metadata, "value").writeStream.format("console").start

As long as the content of the files in so directory matches metadata.txt file you should get a dataframe printed out to the console.

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|hello|
+-----+

Change metadata.txt to, say, world and only worlds from new files get matched.

Upvotes: 1

shanmuga
shanmuga

Reputation: 4499

EDIT This solution is more elaborate and would work (for all use cases).
For simpler cases where the data is appended to existing files without changing the files or read from the databases simpler solution can be used as pointed out in the other answer.
This is because the dataframe (and underlying RDD) partitions are created once and the data is read everytime the datafframe is used. (unless it is cached by spark)


If can afford it you can try to (re)read this meta-data dataframe in every micro-bacth.

A better approach would be to put the meta-data dataframe in a cache (not to be confused with spark caching the dataframe). A cache is similar to a map except that it will not not give entries inserted more than the configured time-to-live duration.

In your code you'll try to fetch this meta-data dataframe from the cache once for every micro batch. If the cache return null. You'll read the data frame again, put into cache and then use the dataframe.

The Cache class would be

import scala.collection.mutable

// cache class to store the dataframe
class Cache[K, V](timeToLive: Long) extends mutable.Map[K, V] {
  private var keyValueStore = mutable.HashMap[K, (V, Long)]()

  override def get(key: K):Option[V] = {
    keyValueStore.get(key) match {
      case Some((value, insertedAt)) if insertedAt+timeToLive > System.currentTimeMillis => Some(value)
      case _ => None
    }
  }

  override def iterator: Iterator[(K, V)] = keyValueStore.iterator
    .filter({
      case (key, (value, insertedAt)) => insertedAt+timeToLive > System.currentTimeMillis
    }).map(x => (x._1, x._2._1))

  override def -=(key: K): this.type = {
    keyValueStore-=key
    this
  }

  override def +=(kv: (K, V)): this.type = {
    keyValueStore += ((kv._1, (kv._2, System.currentTimeMillis())))
    this
  }
}

The logic to access the meta-data dataframe through the cache

import org.apache.spark.sql.DataFrame

object DataFrameCache {
  lazy val cache = new Cache[String, DataFrame](600000) // ten minutes timeToLive

  def readMetaData: DataFrame = ???

  def getMetaData: DataFrame = {
    cache.get("metadataDF") match {
      case Some(df) => df
      case None => {
        val metadataDF = readMetaData
        cache.put("metadataDF", metadataDF)
        metadataDF
      }
    }
  }
}

Upvotes: 0

Related Questions