Reputation: 177
Going to do some state management inside processBroadcastElement()
function.
final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])
override def processBroadcastElement(...): Unit {
val actvTagMap = getRuntimeContext.getMapState(actvTagsMapValue)
val st = actvTagMap.entries() // this line produce an error
}
Getting following error during access state
229797 [LabelShlfEvents -> Sink: Print to Std. Out (1/1)] WARN
o.a.flink.runtime.taskmanager.Task - LabelShlfEvents -> Sink: Print to Std. Out (1/1)
(d3154841fd8bd4cabc00e0145ac37ed8) switched from RUNNING to FAILED.
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
I am not allowed to do this ?
Upvotes: 1
Views: 1082
Reputation: 43524
That doesn't work, for the reason explained by Dominik in his answer.
What you can do in processBroadcastElement
is to access/modify/delete the keyed state for all keys, by using applyToKeyedState
with a KeyedStateFunction
. However, you must take care to behave deterministically across all parallel instances. Otherwise, after recovery or rescaling you could end up with inconsistencies.
Here's an example that emits the values of a ValueState
for every key upon receiving any broadcast message.
public static class DumpFunction
extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {
private ValueStateDescriptor<TaxiRide> taxiDesc;
private ValueState<TaxiRide> taxiState;
@Override
public void open(Configuration config) {
taxiDesc = new ValueStateDescriptor<>("ride", TaxiRide.class);
taxiState = getRuntimeContext().getState(taxiDescriptor);
}
@Override
public void processElement(TaxiRide ride, ReadOnlyContext ctx,
Collector< TaxiRide> out) throws Exception {
taxiState.update(ride);
}
@Override
public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) {
ctx.applyToKeyedState(taxiDesc, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {
@Override
public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {
out.collect(taxiState.value());
}
});
}
}
Upvotes: 2
Reputation: 3864
You can't do this. The reason is quite simple, as MapState
(but also ValueState
, ListState
and more as described here) is type of state called keyed state. This state is partittioned and scoped to the input key of current element.
Broadcast elements are not keyed nor partitioned in any way, so there is no KeyedContext
attached to those elements. When You try to access the state inside the processBroadcastElement
, Flink has no idea which key is this request scoped to, that's why You will get an exception.
On the other hand, You can safely use keyed state in processElement
of KeyedBroadcastProcessFunction
, because those element will have key assigned and the scope is known in case of keyed state.
If You need to use state for broadcast elements that is not Broadcast state, You need to implement this as operator state as described in the documentation.
Upvotes: 1