Reputation: 5376
I read data from Kafka using DirectKafkaStream
API 1, do some transformations, updating a count then writing data back to Kafka. Actually this peace of code is in a test:
kafkaStream[Key, Value]("test")
.map(record => (record.key(), 1))
.updateStateByKey[Int](
(numbers: Seq[Int], state: Option[Int]) =>
state match {
case Some(s) => Some(s + numbers.length)
case _ => Some(numbers.length)
}
)
.checkpoint(this)("count") {
case (save: (Key, Int), current: (Key, Int)) =>
(save._1, save._2 + current._2)
}
.map(_._2)
.reduce(_ + _)
.map(count => (new Key, new Result[Long](count.toLong)))
.toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)
The checkpoint
operator is an enrichment to the DStream
API I've created, which should practically save one RDD of the given DStream
of one Time
into HDFS using saveAsObjectFile
. Practically it saves the result of every 60th micro-batch (RDD) into HDFS.
Checkpoint does the following:
def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")
processor.registerOperator(name)
if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
val restorePath = path + processor.restorationPoint.get.ID.stringify
logInfo(s"Restoring from path [$restorePath].")
checkpointData = context.objectFile[T](restorePath).cache()
stream
.transform((rdd: RDD[T], time: Time) => {
val merged = rdd
.union(checkpointData)
.map[(Boolean, T)](record => (true, record))
.reduceByKey(mergeStates)
.map[T](_._2)
processor.maybeCheckpoint(name, merged, time)
merged
}
)
} else {
stream
.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
}
}
The effective piece of code is the following:
dstream.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
Where dstream
variable in the above code is the result of the previous operator, which is updateStateByKey
, so a transform is called right after updateStateByKey
.
def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
if (doCheckpoint(time)) {
logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
val newPath = configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
logInfo(s"Saving new checkpoint to [$newPath].")
rdd.saveAsObjectFile(newPath)
registerCheckpoint(name, Operator(name), time)
logInfo(s"Checkpoint completed for operator [$name].")
}
}
As you see most of the code is just bookkeeping, but a saveAsObjectFile
is called effectively.
The problem is that even that the resulting RDDs from updateStateByKey
should be persisted automatically, when saveAsObjectFile
is called on every Xth micro-batch, Spark will recompute everything, from the scratch, from the beginning of the streaming job, starting off by reading everything from Kafka again. I've tried to put and force cache
or persist
with different levels of storage, on the DStreams as well as on the RDDs.
Micro-batches:
DAG for job 22:
DAG for job that runs saveAsObjectFile
:
What could be the problem?
Thanks!
1 Using Spark 2.1.0.
Upvotes: 8
Views: 598
Reputation: 2345
I believe using transform
to periodically checkpoint will cause unexpected cache behaviour.
Instead using foreachRDD
to perform periodic checkpointing will allow the DAG to remain stable enough to effectively cache RDDs.
I'm almost positive that was the solution to a similar issue we had a while ago.
Upvotes: 3