Soumitra
Soumitra

Reputation: 612

How to build a lookup map in Spark Streaming?

What is the best way to maintain application state in a spark streaming application?

I know of two ways :

  1. use "Union" operation to append to the lookup RDD and persist it after each union.
  2. save the state in a file or database and load it in the start of each batch.

My question is from the performance perspective which one is better ? Also, is there a better way to do this?

Upvotes: 4

Views: 1530

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

You should really be using mapWithState(spec: StateSpec[K, V, StateType, MappedType]) as follows:

import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))

// checkpointing is mandatory
ssc.checkpoint("_checkpoints")

val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString))
import org.apache.spark.streaming.dstream.ConstantInputDStream
val sessions = new ConstantInputDStream(ssc, rdd)

import org.apache.spark.streaming.{State, StateSpec, Time}
val updateState = (batchTime: Time, key: Int, value: Option[String], state: State[Int]) => {
  println(s">>> batchTime = $batchTime")
  println(s">>> key       = $key")
  println(s">>> value     = $value")
  println(s">>> state     = $state")
  val sum = value.getOrElse("").size + state.getOption.getOrElse(0)
  state.update(sum)
  Some((key, value, sum)) // mapped value
}
val spec = StateSpec.function(updateState)
val mappedStatefulStream = sessions.mapWithState(spec)

mappedStatefulStream.print()

Upvotes: 6

Related Questions