Reputation: 581
This question is a follow-up to State management not serializable.
I want to encapsulate state management logic.
The following represents where I am at right now:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}
This works fine, but only allows DStream[(String,String)]
to be handled, which is a first step towards generic state management, fit to welcome any DStream
: from DStream[(Int,String)]
to DStream[(String,myCustomClass)]
.
myState
requires to be a value function in order to work (serialization
).
But I face a problem as type parameter
s don't apply to function objects in scala.
user6910411 gave me a hint by using ClassTag
s with an enclosing method (Type-parameterize a DStream), but in turn it'd still be a method.
Would anyone have some intel on how to overcome those difficulties?
The context:
Spark 1.6
Spark Graph:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}
The StreamingEnvironment
class to create the Streaming
:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}
Upvotes: 1
Views: 152
Reputation: 330173
Here you are:
App.scala
:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ConstantInputDStream
import statemanager._
object App {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[*]", "generic", new SparkConf())
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/chk")
StateManager(
new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))),
(_: String, _: Option[Int], _: State[Int]) => Option(1)
).myState.print
ssc.start()
ssc.awaitTermination()
}
}
StateManage.scala
:
package statemanager
import scala.reflect.ClassTag
import org.apache.spark.streaming.{State, StateSpec}
import org.apache.spark.streaming.dstream.DStream
class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
stream: DStream[(T, U)],
updateStateFunction: (T, Option[U], State[V]) => Option[W]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
_dStream: DStream[(T, U)],
_updateState: (T, Option[U], State[V]) => Option[W]
) =
new StateManager(_dStream, _updateState)
}
build.sbt
:
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
Directory structure:
├── App.scala
├── build.sbt
└── StateManage.scala
Example execution:
sbt run
...
-------------------------------------------
Time: 1483701790000 ms
-------------------------------------------
1
1
...
As you can see there is no magic here. If you introduce generic arguments you need ClassTags
in the same context.
Upvotes: 1