zhangshengxiong
zhangshengxiong

Reputation: 383

How to Know which SubTask a Local Window belongs to

Is it possible to know which subtask a local window belongs to in flink streaming? I want to use the getRuntimeContext().getIndexOfThisSubtask() method in TriggerPolicy implementations.

Upvotes: 4

Views: 272

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

There is no way at the moment to obtain the index of the subtask on which the windowing operator with the TriggerPolicy is running.

However, you can work around it by putting a map operation in place upstream which assigns to each data element the current index of the subtask.

DataStream<Tuple2<Integer, String>> ds = env.fromElements(
        new Tuple2<Integer, String>(1, "a"),
        new Tuple2<Integer, String>(2, "b"),
        new Tuple2<Integer, String>(1, "c"),
        new Tuple2<Integer, String>(2, "d"));

ds.groupBy(0)
    .map(new RichMapFunction<Tuple2<Integer,String>, Tuple3<Integer, Integer, String>>() {
        @Override
        public Tuple3<Integer, Integer, String> map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
            return new Tuple3<Integer, Integer, String>(
                getRuntimeContext().getIndexOfThisSubtask(),
                integerStringTuple2.f0,
                integerStringTuple2.f1);
        }
    })
    .window(new TestingTriggerPolicy(), new TestingEvictionPolicy())
    .mapWindow(new WindowMapFunction<Tuple3<Integer, Integer, String>, String>() {
        @Override
        public void mapWindow(Iterable<Tuple3<Integer, Integer, String>> iterable, Collector<String> collector) throws Exception {
            StringBuilder builder = new StringBuilder();

            for (Tuple3<Integer, Integer, String> element : iterable) {
                builder.append(element.toString() +"; ");
            }

            collector.collect(builder.toString());
        }
    })

Upvotes: 2

Related Questions