Dyin
Dyin

Reputation: 5376

Spark DStream periodically call saveAsObjectFile using transform does not work as expected

I read data from Kafka using DirectKafkaStream API 1, do some transformations, updating a count then writing data back to Kafka. Actually this peace of code is in a test:

kafkaStream[Key, Value]("test")
      .map(record => (record.key(), 1))
      .updateStateByKey[Int](
        (numbers: Seq[Int], state: Option[Int]) =>
          state match {
            case Some(s) => Some(s + numbers.length)
            case _ => Some(numbers.length)
          }
      )
      .checkpoint(this)("count") {
        case (save: (Key, Int), current: (Key, Int)) =>
          (save._1, save._2 + current._2)
      }
      .map(_._2)
      .reduce(_ + _)
      .map(count => (new Key, new Result[Long](count.toLong)))
      .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)

The checkpoint operator is an enrichment to the DStream API I've created, which should practically save one RDD of the given DStream of one Time into HDFS using saveAsObjectFile. Practically it saves the result of every 60th micro-batch (RDD) into HDFS.

Checkpoint does the following:

def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
  "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
  Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")

processor.registerOperator(name)

if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
  val restorePath = path + processor.restorationPoint.get.ID.stringify
  logInfo(s"Restoring from path [$restorePath].")
  checkpointData = context.objectFile[T](restorePath).cache()

  stream
    .transform((rdd: RDD[T], time: Time) => {
      val merged = rdd
        .union(checkpointData)
        .map[(Boolean, T)](record => (true, record))
        .reduceByKey(mergeStates)
        .map[T](_._2)

      processor.maybeCheckpoint(name, merged, time)

      merged
    }
  )
} else {
  stream
    .transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })
}
}

The effective piece of code is the following:

dstream.transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })

Where dstream variable in the above code is the result of the previous operator, which is updateStateByKey, so a transform is called right after updateStateByKey.

def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
  if (doCheckpoint(time)) {
    logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
    val newPath = configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
    Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
    logInfo(s"Saving new checkpoint to [$newPath].")
    rdd.saveAsObjectFile(newPath)
    registerCheckpoint(name, Operator(name), time)
    logInfo(s"Checkpoint completed for operator [$name].")
  }
}

As you see most of the code is just bookkeeping, but a saveAsObjectFile is called effectively.

The problem is that even that the resulting RDDs from updateStateByKey should be persisted automatically, when saveAsObjectFile is called on every Xth micro-batch, Spark will recompute everything, from the scratch, from the beginning of the streaming job, starting off by reading everything from Kafka again. I've tried to put and force cache or persist with different levels of storage, on the DStreams as well as on the RDDs.

Micro-batches:

Micro batches

DAG for job 22:

DAG for job 22

DAG for job that runs saveAsObjectFile:

SAOF1 SAOF2

What could be the problem?

Thanks!

1 Using Spark 2.1.0.

Upvotes: 8

Views: 598

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

I believe using transform to periodically checkpoint will cause unexpected cache behaviour.

Instead using foreachRDD to perform periodic checkpointing will allow the DAG to remain stable enough to effectively cache RDDs.

I'm almost positive that was the solution to a similar issue we had a while ago.

Upvotes: 3

Related Questions