Tom
Tom

Reputation: 6332

flink broadcast stream and broadcast state

As of the broadcast stream and broad caset state, I have two questions.

  1. KeyedBroadcastProcessFunction#Context has the following method(getBroadcastState), I would ask why it is using map state descriptor for the broad cast state? I have thought that broad cast state should use something like BroadcastStateDescriptor and map state descriptor is used for MapState?

public abstract <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor);

  1. DataStream#broadcast method is defined as follows:
  def broadcast(broadcastStateDescriptors: MapStateDescriptor[_, _]*): BroadcastStream[T] = {
    if (broadcastStateDescriptors == null) {
      throw new NullPointerException("State Descriptors must not be null.")
    }
    javaStream.broadcast(broadcastStateDescriptors: _*)
  }

I would ask what the arguments broadcastStateDescriptors used for? Why should I have to provide it so early when broadcasting the stream? I think I can create the descriptor and get the broadcast state when I need it in KeyedBroadcastProcessFunction#processBroadcastElement where the operator receives the broadcasted element and update the broadcast state.

Upvotes: 0

Views: 1181

Answers (1)

David Anderson
David Anderson

Reputation: 43409

  1. MapState is the kind of state (and the only kind of state) that Flink supports for broadcasting. And since broadcast state is always MapState, a MapStateDescriptor is what is used to work with it.

  2. Flink needs to know how to serialize the data that is being broadcast; broadcastStateDescriptors is used by DataStream#broadcast for this purpose.

Upvotes: 2

Related Questions