Vibhuti
Vibhuti

Reputation: 1634

Spark Streaming stateful transformation mapWithState function getting error java.util.NoSuchElementException: None.get

I wanted to replace my updateStateByKey function with mapWithState function (Spark 1.6) to improve performance of my program.

I was following these two documents: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-spark-streaming.html

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Streaming%20mapWithState.html

but i am getting error scala.MatchError: [Ljava.lang.Object]

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed 4 times, most recent failure: Lost task 0.3 in stage 71.0 (TID 88, ttsv-lab-vmdb-01.englab.juniper.net): scala.MatchError: [Ljava.lang.Object;@eaf8bc8 (of class [Ljava.lang.Object;)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at HbaseCovrageStream$$anonfun$HbaseCovrageStream$$tracketStateFunc$1$3.apply(HbaseCoverageStream_mapwithstate.scala:84)
at scala.Option.flatMap(Option.scala:170)
at HbaseCovrageStream$.HbaseCovrageStream$$tracketStateFunc$1(HbaseCoverageStream_mapwithstate.scala:84)

Reference code:

def trackStateFunc(key:String, value:Option[Array[Long]], current:State[Seq[Array[Long]]]):Option[Array[Long]] = {

        /*adding current state to the previous state*/
        val res = value.map(x => x +: current.getOption().get).orElse(current.getOption())
        current.update(res.get)
        res.flatMap {
          case as: Seq[Array[Long]] => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption //throws match error
        }
      }

      val statespec:StateSpec[String, Array[Long], Array[Long], Option[Array[Long]]] = StateSpec.function(trackStateFunc _)

      val state: MapWithStateDStream[String, Array[Long], Array[Long], Option[Array[Long]]] = parsedStream.mapWithState(statespec)

My previous working code which was using updateStateByKey function:

val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
         prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })

Upvotes: 1

Views: 2177

Answers (2)

Vibhuti
Vibhuti

Reputation: 1634

Thanks Igor. I changed my trackStateFunc, it's working now.

For reference my working code with mapWithState:

def trackStateFunc(batchTime: Time, key: String, value: Option[Array[Long]], state: State[Array[Long]])
  : Option[(String, Array[Long])] = {
  // Check if state exists
  if (state.exists) {
    val newState:Array[Long] = Array(state.get, value.get).transpose.map(_.sum)
    state.update(newState)    // Set the new state
    Some((key, newState))
  } else {
    val initialState = value.get
    state.update(initialState) // Set the initial state
    Some((key, initialState))
  }
}

// StateSpec[KeyType, ValueType, StateType, MappedType]
val stateSpec: StateSpec[String, Array[Long], Array[Long], (String, Array[Long])] = StateSpec.function(trackStateFunc _)

val state: MapWithStateDStream[String, Array[Long], Array[Long], (String, Array[Long])] = parsedStream.mapWithState(stateSpec)

Upvotes: 1

Igor Berman
Igor Berman

Reputation: 1532

might be your problem is with a case when value is absent: you'll wrap state in Some and then you should match it. Or you can use state.getOption(check once again example in link you've attached)

Upvotes: 1

Related Questions