banjara
banjara

Reputation: 3890

Spark Streaming share state between two streams

Can we share spark streaming state between two DStreams??

Basically I want to create/update state using first stream and enrich second stream using state.

Example: I have modified StatefulNetworkWordCount example. I am creating state using first stream and enriching second stream with count of first stream.

val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)

  Some(output)
}

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
  val sum = state.getOption.getOrElse(0)
  val output = (word, sum)

  Some(output)
}



// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(1)



// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(50)

In checkpointing directory, I can see two different state RDDs.

I am using spark-1.6.1 and kafka-0.8.2.1

Upvotes: 4

Views: 853

Answers (2)

maasg
maasg

Reputation: 37435

It's possible to access the underlying StateDStream of the DStream resulting of applying the mapWithState operation by using stateMappedDStream.snapshotStream()

So, inspired on your example:

val firstDStream = ???
val secondDStream = ???
val firstDStreamSMapped = firstDStream..mapWithState(...)
val firstStreamState = firstDStreamSMapped.snapshotStream()
// we want to use the state of Stream 1 to enrich Stream 2. The keys of both streams are required to match.
val enrichedStream = secondDStream.join(firstStreamState)
... do stuff with enrichedStream ...

Upvotes: 2

bug_xshguo
bug_xshguo

Reputation: 1

This method may be helpful for you:

    ssc.untion(Seq[Dstream[T]])

Upvotes: -1

Related Questions