Reputation: 581
I was wondering if it was possible to maintain radically different states across an application? For example, have the update function
of the first state call the one from the second state?
I do not recall going through any such example, nor did I find any counter indication... Based on the example from https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html, I know of no reason why I wouldn't be able to have different trackStateFunc
s with different State
s, and still update those thanks to their Key
, as shown below:
def firstTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
state.update(sum)
Some(output)
}
and
def secondTrackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Int]): Option[(String, Long)] = {
// disregard problems this example would cause
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, dif)
state.update(dif)
Some(output)
}
I think this is possible but still remain unsure. I would like someone to validate or invalidate this assumption...
Upvotes: 3
Views: 487
Reputation: 1397
@Yuval gave a great answer to chain mapWithState functions. However, I have another approach. Instead of having two mapWithState calls, you can put both the sum and the diff in the same State[(Int, Int)].
In this case, you would only need one mapWithState functions where you could update both the things. Something like this:
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[(Long, Int)]): Option[(String, (Long, Int))] =
{
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
val output = (key, (sum, diff))
state.update((sum, diff))
Some(output)
}
Upvotes: 1
Reputation: 149538
I was wondering if it was possible to maintain radically different states across an application?
Every call to mapWithState
on a DStream[(Key, Value)]
can hold one State[T]
object. This T
needs to be the same for every invocation of mapWithState
. In order to use different states, you can either chain mapWithState
calls, where one's Option[U]
is anothers input, or you can have split the DStream
and apply a different mapWithState
call to each one. You cannot, however, call a different State[T]
object inside another, as they are isolated from one another, and one cannot mutate the state of the other.
Upvotes: 2