sparkless
sparkless

Reputation: 305

Apache Flink MapState vs Value[Map[String, String]] usage

I divide stream by key, and manage a map state for each key like;

stream
  .keyBy(_.userId)
  .process(new MyStateFunc)

Every time, I have to read all values under a key, calculate something and update only a few of them. An example;

class MyStateFunc() .. {
  

    val state = ValueState[Map[String, String]]

    def process(event: MyModel...): {
       val stateAsMap = state.value()
       val updatedStateValues = updateAFewColumnsOfStateValByUsingIncomingEvent(event, stateAsMap)
       doCalculationByUsingSomeValuesOfState(updatedStateValues)
       state.update(updatedStateValues)
    }
    def updateAFewColumnsOfStateValByUsingIncomingEvent(event, state): Map[String, String] = {
      val updateState = Map.empty
      event.foreach {case (status, newValue) => 
        updateState.put(status, newValue)
      }
      state ++ updatedState
    }
    def doCalculationByUsingSomeValuesOfState(stateValues): Map[String, String] = {
      // do some staff by using some key and values
    }
}

I'm not sure this is the most efficient way. Yeah, I have to read all the values (at least some of them) to do a calculation, but I also need to update a few of them, not all of the Map stored in each key. I'm just wondering that which one is more efficient; Value[Map[String, String]] vs MapState[String, String]?

If I use MapState[String, String], I have to do something like below in order to update related keys;

    val state = MapState[String, String]
    def process(event: MyModel...): {
       val stateAsMap = state.entries().asScala
       event.foreach { case (status, newValue)
         state.put(status, newValue)
       }
    }

I'm not sure trying to update state for each event type is the efficient or not.

mapState.putAll(changeEvents)

Will this only overwrite related keys instead of all of them?

Or can be another way to overcome?

Upvotes: 0

Views: 1077

Answers (1)

kkrugler
kkrugler

Reputation: 9245

If your state only has a few entries, then it likely doesn't matter much. If your map can have a significant number of entries, then using MapState (with RocksDB state backend) should significantly cut down on the serialization cost, as you're only updating a few entries versus the entire state.

Note that for efficiency you should iterate over the MapState once, doing your calculation and (occasionally) updating the entry, assuming that's possible.

Upvotes: 2

Related Questions