Sergey Postument
Sergey Postument

Reputation: 177

Access flink state inside processBroadcastElement function

Going to do some state management inside processBroadcastElement() function.

final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])

override def processBroadcastElement(...): Unit {
    val actvTagMap = getRuntimeContext.getMapState(actvTagsMapValue)
    val st = actvTagMap.entries() // this line produce an error
}

Getting following error during access state

229797 [LabelShlfEvents -> Sink: Print to Std. Out (1/1)] WARN  
o.a.flink.runtime.taskmanager.Task - LabelShlfEvents -> Sink: Print to Std. Out (1/1) 
(d3154841fd8bd4cabc00e0145ac37ed8) switched from RUNNING to FAILED. 
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

I am not allowed to do this ?

Upvotes: 1

Views: 1082

Answers (2)

David Anderson
David Anderson

Reputation: 43524

That doesn't work, for the reason explained by Dominik in his answer.

What you can do in processBroadcastElement is to access/modify/delete the keyed state for all keys, by using applyToKeyedState with a KeyedStateFunction. However, you must take care to behave deterministically across all parallel instances. Otherwise, after recovery or rescaling you could end up with inconsistencies.

Here's an example that emits the values of a ValueState for every key upon receiving any broadcast message.

public static class DumpFunction
        extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {
    private ValueStateDescriptor<TaxiRide> taxiDesc;
    private ValueState<TaxiRide> taxiState;
    
    @Override
    public void open(Configuration config) {
        taxiDesc = new ValueStateDescriptor<>("ride", TaxiRide.class);
        taxiState = getRuntimeContext().getState(taxiDescriptor);
    }
    
    @Override
    public void processElement(TaxiRide ride, ReadOnlyContext ctx, 
            Collector< TaxiRide> out) throws Exception {
        taxiState.update(ride);
    }
            
    @Override
    public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) {
        ctx.applyToKeyedState(taxiDesc, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {
            @Override
            public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {
                out.collect(taxiState.value());
            }
        });
    }
}

Upvotes: 2

Dominik Wosiński
Dominik Wosiński

Reputation: 3864

You can't do this. The reason is quite simple, as MapState (but also ValueState, ListState and more as described here) is type of state called keyed state. This state is partittioned and scoped to the input key of current element.

Broadcast elements are not keyed nor partitioned in any way, so there is no KeyedContext attached to those elements. When You try to access the state inside the processBroadcastElement, Flink has no idea which key is this request scoped to, that's why You will get an exception.

On the other hand, You can safely use keyed state in processElement of KeyedBroadcastProcessFunction, because those element will have key assigned and the scope is known in case of keyed state.

If You need to use state for broadcast elements that is not Broadcast state, You need to implement this as operator state as described in the documentation.

Upvotes: 1

Related Questions