shiv455
shiv455

Reputation: 7784

Refresh Dataframe in Spark real-time Streaming without stopping process

in my application i get a stream of accounts from Kafka queue (using Spark streaming with kafka)

And i need to fetch attributes related to these accounts from S3 so im planning to cache S3 resultant dataframe as the S3 data will not updated atleast for a day for now, it might change to 1hr or 10 mins very soon in future .So the question is how can i refresh the cached dataframe periodically without stopping process.

**Update:Im planning to publish an event into kafka whenever there is an update in S3, using SNS and AWS lambda and my streaming application will subscribe to the event and refresh the cached dataframe based on this event (basically unpersist()cache and reload from S3) Is this a good approach ?

Upvotes: 3

Views: 3258

Answers (2)

Simplest way to achieve , below code reads dimension data folder for every batch but do keep in mind new dimension data values (country names in my case) have to be a new file.

package com.databroccoli.streaming.dimensionupateinstreaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

object RefreshDimensionInStreaming {

  def main(args: Array[String]) = {

    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")

    var countryDf: Option[DataFrame] = None: Option[DataFrame]

    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")

      if (countryDf != None) {
        countryDf.get.unpersist()
      }

      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))

      countryDf.get.show()
    }

    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)

        updateDimensionDf()

        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show

      }
      .start()
      .awaitTermination()

  }

}

Upvotes: 0

plamb
plamb

Reputation: 5636

This question was recently asked on the Spark Mailing List

As far as I know the only way to do what you're asking is to reload the DataFrame from S3 when new data arrives which means you have to recreate the streaming DF as well and restart the query. This is because DataFrames are fundamentally immutable.

If you want to update (mutate) data in a DataFrame without reloading it, you need to try one of the datastores that integrate with or connect to Spark and allow mutations. One that I'm aware of is SnappyData.

Upvotes: 5

Related Questions