Tobi
Tobi

Reputation: 31479

How to correctly use checkpointing to solve the "lineage issues" in Spark Streaming applications

I'm having some "lineage problems" when running a Spark Streaming application, which loads historical data from Elasticsearch on startup, and updates this data with data coming from Apache Kafka messages.

I posted a question some time ago because after a while, my application runs in a kind of deadlock, meaning that the amount of time needed to calculate the results is longer than the streaming window, thus stalling it.

See

The recommendation was to use checkpointing, which I tried. Still, the problem remains the same.

Here's my current sample code:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object ReadFromKafkaAndES {
  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("kafka").setLevel(Level.WARN)

    System.setProperty("hadoop.home.dir", "D:\\Development\\fake-hadoop-for-spark")

    val checkpointDirectory = "D:/tmp/Spark"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")

    val topicsSet = Array("sales").toSet
    var offsetRanges = Array[OffsetRange]()

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(20))
    ssc.checkpoint(checkpointDirectory)

    //Create SQLContect
    val sqlContext = new SQLContext(sc)

    //Get history data from ES
    var history : DataFrame = sqlContext.esDF("data/salesaggregation")

    //Kafka settings
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Register temporary table for the aggregated history
        history.registerTempTable("history")

        println("--- History -------------------------------")
        history.show()

        //Parse JSON as DataFrame
        val saleEvents = sqlContext.read.json(rdd.values)

        //Register temporary table for sales events
        saleEvents.registerTempTable("sales")

        val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")

        println("--- Sales ---------------------------------")
        sales.show()

        val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")

        println("--- Aggregation ---------------------------")
        agg.show()

        //This is our new "history"
        history = agg.toDF()

        //As recommended, call checkpoint()
        history.rdd.checkpoint()

        //Save to Elasticsearch
        history.saveToEs("data/salesaggregation", Map("es.mapping.id" -> "userId"))

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Is there somewhere a problem in my thinking? The code itself runs smoothly, but yet the problems remain.

I was also trying to use updatestatebykey, but it's giving me a hard time as a Spark and Scala beginner, because from what I understand it only works on DStream pairs, and I will have a number of fields to update for each userId.

EDIT:

I added history.explain(true) according to Holden's answer after the aggregation statement. This shows that the lineage increases with every iteration:

First iteration:

== Parsed Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
     Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
      Subquery history
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
    Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
     Subquery sales
      Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Analyzed Logical Plan ==
userId: bigint, latestSaleTimestamp: timestamp, totalRevenue: double, totalPoints: bigint
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
     Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
      Subquery history
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
    Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
     Subquery sales
      Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Optimized Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
  Union
   Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
    LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
   Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
    Project [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L]
     Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133

== Physical Plan ==
Repartition 4, false
 Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#34) AS latestSaleTimestamp#13,CombineSum(PartialSum#35) AS totalRevenue#14,CombineSum(PartialSum#36L) AS totalPoints#15L]
  Exchange (HashPartitioning 200)
   Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS PartialMax#34,SUM(totalRevenue#28) AS PartialSum#35,SUM(totalPoints#29L) AS PartialSum#36L]
    Union
     Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
      PhysicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
     Aggregate false, [userId#12L], [userId#12L,CAST(MAX(PartialMax#40L), TimestampType) AS latestSaleTimestamp#25,CombineSum(PartialSum#41) AS totalRevenue#26,CombineSum(PartialSum#42L) AS totalPoints#27L]
      Exchange (HashPartitioning 200)
       Aggregate true, [userId#12L], [userId#12L,MAX(saleTimestamp#9L) AS PartialMax#40L,SUM(totalRevenue#11) AS PartialSum#41,SUM(totalPoints#10L) AS PartialSum#42L]
        PhysicalRDD [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L], MapPartitionsRDD[6] at foreachRDD at ReadFromKafkaAndES.scala:51

Code Generation: false
== RDD ==

Second iteration:

== Parsed Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#13,totalRevenue#14,totalPoints#15L]
     Subquery history
      Repartition 4, false
       Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
        Subquery a
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
            Subquery history
             LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
           Subquery sales
            Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
    Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
     Subquery sales
      Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Analyzed Logical Plan ==
userId: bigint, latestSaleTimestamp: timestamp, totalRevenue: double, totalPoints: bigint
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Subquery a
   Union
    Project [userId#4L,latestSaleTimestamp#13,totalRevenue#14,totalPoints#15L]
     Subquery history
      Repartition 4, false
       Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
        Subquery a
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           Project [userId#4L,latestSaleTimestamp#0,totalRevenue#3,totalPoints#2]
            Subquery history
             LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
           Subquery sales
            Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
    Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
     Subquery sales
      Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Optimized Logical Plan ==
Repartition 4, false
 Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS latestSaleTimestamp#147,SUM(totalRevenue#14) AS totalRevenue#148,SUM(totalPoints#15L) AS totalPoints#149L]
  Union
   Repartition 4, false
    Aggregate [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS latestSaleTimestamp#13,SUM(totalRevenue#28) AS totalRevenue#14,SUM(totalPoints#29L) AS totalPoints#15L]
     Union
      Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
       LogicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
      Aggregate [userId#12L], [userId#12L,CAST(MAX(saleTimestamp#9L), TimestampType) AS latestSaleTimestamp#25,SUM(totalRevenue#11) AS totalRevenue#26,SUM(totalPoints#10L) AS totalPoints#27L]
       Project [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L]
        Relation[lineItems#5,otherRevenue#6,productList#7,productRevenue#8,saleTimestamp#9L,totalPoints#10L,totalRevenue#11,userId#12L] org.apache.spark.sql.json.JSONRelation@eecc133
   Aggregate [userId#146L], [userId#146L,CAST(MAX(saleTimestamp#143L), TimestampType) AS latestSaleTimestamp#159,SUM(totalRevenue#145) AS totalRevenue#160,SUM(totalPoints#144L) AS totalPoints#161L]
    Project [userId#146L,saleTimestamp#143L,totalRevenue#145,totalPoints#144L]
     Relation[lineItems#139,otherRevenue#140,productList#141,productRevenue#142,saleTimestamp#143L,totalPoints#144L,totalRevenue#145,userId#146L] org.apache.spark.sql.json.JSONRelation@eecc133

== Physical Plan ==
Repartition 4, false
 Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#166) AS latestSaleTimestamp#147,CombineSum(PartialSum#167) AS totalRevenue#148,CombineSum(PartialSum#168L) AS totalPoints#149L]
  Exchange (HashPartitioning 200)
   Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#13) AS PartialMax#166,SUM(totalRevenue#14) AS PartialSum#167,SUM(totalPoints#15L) AS PartialSum#168L]
    Union
     Repartition 4, false
      Aggregate false, [userId#4L], [userId#4L,MAX(PartialMax#172) AS latestSaleTimestamp#13,CombineSum(PartialSum#173) AS totalRevenue#14,CombineSum(PartialSum#174L) AS totalPoints#15L]
       Exchange (HashPartitioning 200)
        Aggregate true, [userId#4L], [userId#4L,MAX(latestSaleTimestamp#0) AS PartialMax#172,SUM(totalRevenue#28) AS PartialSum#173,SUM(totalPoints#29L) AS PartialSum#174L]
         Union
          Project [userId#4L,latestSaleTimestamp#0,CAST(totalRevenue#3, DoubleType) AS totalRevenue#28,CAST(totalPoints#2, LongType) AS totalPoints#29L]
           PhysicalRDD [latestSaleTimestamp#0,productList#1,totalPoints#2,totalRevenue#3,userId#4L], MapPartitionsRDD[1] at createDataFrame at EsSparkSQL.scala:28
          Aggregate false, [userId#12L], [userId#12L,CAST(MAX(PartialMax#178L), TimestampType) AS latestSaleTimestamp#25,CombineSum(PartialSum#179) AS totalRevenue#26,CombineSum(PartialSum#180L) AS totalPoints#27L]
           Exchange (HashPartitioning 200)
            Aggregate true, [userId#12L], [userId#12L,MAX(saleTimestamp#9L) AS PartialMax#178L,SUM(totalRevenue#11) AS PartialSum#179,SUM(totalPoints#10L) AS PartialSum#180L]
             PhysicalRDD [userId#12L,saleTimestamp#9L,totalRevenue#11,totalPoints#10L], MapPartitionsRDD[45] at foreachRDD at ReadFromKafkaAndES.scala:51
     Aggregate false, [userId#146L], [userId#146L,CAST(MAX(PartialMax#196L), TimestampType) AS latestSaleTimestamp#159,CombineSum(PartialSum#197) AS totalRevenue#160,CombineSum(PartialSum#198L) AS totalPoints#161L]
      Exchange (HashPartitioning 200)
       Aggregate true, [userId#146L], [userId#146L,MAX(saleTimestamp#143L) AS PartialMax#196L,SUM(totalRevenue#145) AS PartialSum#197,SUM(totalPoints#144L) AS PartialSum#198L]
        PhysicalRDD [userId#146L,saleTimestamp#143L,totalRevenue#145,totalPoints#144L], MapPartitionsRDD[46] at foreachRDD at ReadFromKafkaAndES.scala:51

Code Generation: false
== RDD ==

So, it seems the checkpointing has not the desired effect, but I still can't figure out how to fix this. Thanks for any hint!

Upvotes: 0

Views: 3217

Answers (2)

Patrick McGloin
Patrick McGloin

Reputation: 2234

You are checkpointing each RDD that comes in every 20 seconds. How many RDDs is that? It could be dozens. Checkpointing writes the dataset to the filesystem which kind of defeats one of Sparks main selling points: in-memory processing.

Things which I think you should consider:

  • Don't checkpoint every iteration, rather do so every 5-10 iterations.
  • Reconsider calling checkpoint within foreachRDD. Perhaps try to create the new aggregates within a map function, then call coalesce(1) / repatriation(1) on the output RDD to reduce the resulting aggregates down to one RDD partition. And then checkpoint that every 5 - 10 sliding intervals.

Doing those two things should reduce the amount of file-system interaction dramatically.

Upvotes: 0

Holden
Holden

Reputation: 7452

I'd recommend looking into updateStateByKey some more and considering flatmapping the updates (so each input record can create multiple possible updates).

One interesting thing is, your checkpointing the rdd of the history DataFrame, but when you register it as a temp table it always uses the logical plan so your checkpointing might not be doing anything (but I'm not sure and its close to 1am here, so I'd add in an explain in your debugging and you can see what the lineage ends up looking like).

Upvotes: 1

Related Questions