wipman
wipman

Reputation: 581

Handle different states

I was wondering if it was possible to maintain radically different states across an application? For example, have the update function of the first state call the one from the second state?

I do not recall going through any such example, nor did I find any counter indication... Based on the example from https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html, I know of no reason why I wouldn't be able to have different trackStateFuncs with different States, and still update those thanks to their Key, as shown below:

def firstTrackStateFunc(batchTime: Time, 
                        key: String, 
                        value: Option[Int], 
                        state: State[Long]): Option[(String, Long)] = {
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val output = (key, sum)
    state.update(sum)
    Some(output)
}

and

def secondTrackStateFunc(batchTime: Time, 
                         key: String, 
                         value: Option[Int], 
                         state: State[Int]): Option[(String, Long)] = {
    // disregard problems this example would cause
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, dif)
    state.update(dif)
    Some(output)
}

I think this is possible but still remain unsure. I would like someone to validate or invalidate this assumption...

Upvotes: 3

Views: 487

Answers (2)

Sohum Sachdev
Sohum Sachdev

Reputation: 1397

@Yuval gave a great answer to chain mapWithState functions. However, I have another approach. Instead of having two mapWithState calls, you can put both the sum and the diff in the same State[(Int, Int)].

In this case, you would only need one mapWithState functions where you could update both the things. Something like this:

def trackStateFunc(batchTime: Time, 
                   key: String, 
                   value: Option[Int], 
                   state: State[(Long, Int)]): Option[(String, (Long, Int))] =
{
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L)
    val output = (key, (sum, diff))
    state.update((sum, diff))
    Some(output)
}

Upvotes: 1

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

I was wondering if it was possible to maintain radically different states across an application?

Every call to mapWithState on a DStream[(Key, Value)] can hold one State[T] object. This T needs to be the same for every invocation of mapWithState. In order to use different states, you can either chain mapWithState calls, where one's Option[U] is anothers input, or you can have split the DStream and apply a different mapWithState call to each one. You cannot, however, call a different State[T] object inside another, as they are isolated from one another, and one cannot mutate the state of the other.

Upvotes: 2

Related Questions