user6134689
user6134689

Reputation:

Can I store data in RAM with Apache Spark?

I would like to know if it is possible to store a bunch of strings, for example, in RAM with Apache Spark. Indeed, I want to query and update these strings depending the new input data that Apache Spark is treating. Futhermore, if it is possible, can a node notify all other nodes which strings are stored ? If you need information about my projet, feel free to ask.

J

Upvotes: 0

Views: 332

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

Yes, you need the stateful streaming function mapWithState. This function allows you to update state cached in memory across streaming batches.

Note that you will need to enable checkpointing if you haven't already done so.

Scala example usage:

def stateUpdateFunction(userId: UserId,
                        newData: UserAction,
                        stateData: State[UserSession]): UserModel = {
    val currentSession = stateData.get()    // Get current session data
    val updatedSession = ...            // Compute updated session using newData
    stateData.update(updatedSession)            // Update session data     
    val userModel = ...                 // Compute model using updatedSession
    return userModel                // Send model downstream
}

// Stream of user actions, keyed by the user ID
val userActions = ...  // stream of key-value tuples of (UserId, UserAction)
// Stream of data to commit
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))

https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html

Java example usage:

// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
    new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> call(String word, Optional<Integer> one,
          State<Integer> state) {
        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
      }
    };

// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
    wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java line 90:

Upvotes: 1

Related Questions