victtim
victtim

Reputation: 840

In Flink, how can I access the Key when applying a Process Function on a Keyed Stream?

In Flink, I have a keyed stream to which I am applying a Process Function.

myDataStream
  .keyBy(new MyKeySelector())
  .process(new FooBarProcessFunction())

My Key Selector looks something like this...

public class MyKeySelector implements KeySelector<FooBar, FooKey>

public FooKey getKey (FooBar value) {
   return new FooKey (value);
}

And FooBarProcessFunction looks something like this...

public class FooBarProcessFunction extends ProcessFunction<FooBar, TransformedFooBar> {

    public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
        //do something with newFooBar
        // *****but I also want to know the Key (FooKey) here***** 
    }
}

In FooBarProcessFunction, I would like to obtain the Key which was created by MyKeySelector's getKey method. Is that doable?

At present, I am using a workaround wherein I essentially recreate the Key in the processElement function. But it would be ideal if I can avoid doing so.

Upvotes: 4

Views: 3072

Answers (3)

Chandan Bansal
Chandan Bansal

Reputation: 49

Inside onTimer or processElement function:

String dataId = (String) context.getCurrentKey();

Upvotes: 0

theShadow89
theShadow89

Reputation: 1549

In order to access to the key from a process function, you should use KeyedProcessFunction

Your example become:

public class FooBarProcessFunction extends KeyedProcessFunction<FooKey, FooBar, TransformedFooBar> {

    public void processElement(FooBar newFooBar, Context ctx, Collector<FooBar> out) {
        //do something with newFooBar
        // *****but I also want to know the Key (FooKey) here***** 
        ctx.getCurrentKey
    }
}

Upvotes: 2

alz2
alz2

Reputation: 206

Seems like you can get the key if you window your keyed stream and apply a ProcessWindowFunction<IN, OUT, KEY, W extends Window>.

There are some examples of this on the Apache flink docs. Quick note that a ProcessWindowFunction is inefficient and should be combined with a ReduceFunction, AggregateFunction, or FoldFunction.

Hope this helps!

Upvotes: 0

Related Questions