elmis
elmis

Reputation: 116

How to get current iteration step in Flink's iterators?

Is it possible to access the current iteration index when performing iterations in Apache Flink 1.3.2 and Scala 2.10?

val initialData: DataSet[(ItemSet[T], Int)]

initialData.iterate(maxIterations) {
    current: DataSet[(ItemSet[T], Int)] =>
        val currentIteration = ??? // Is this accessible somehow?
        // ...
        current
}

Based on my search so far, the short answer is "No" and the slightly longer answer is that it may be possible to hack around this using Flink's raw state.

Upvotes: 2

Views: 272

Answers (1)

Amarjit Dhillon
Amarjit Dhillon

Reputation: 2816

Solution # 1: One way is to write a system.out.println() inside this iterator and print values to console, or write those values to CSV as, but there will be some overhead in these cases.

Solution # 2: Another way is to use Counter inside a map function which will increment the values and then use JobExecutionResult to get this value

Here is how I can be done in Java


Let's say I am joining 2 streams and I want to count how many events are getting merged in total.

DataStream<Event> mergedStream = stream1.union(stream2);

To do this I will write a map function which will be called for each merged event and perform an increment using countername.add(value_to_increment). Also, we have given it a name merged, which can be used at the end to fetch the results.

 mergedStream.map(new RichMapFunction<Event, Object>() {

            IntCounter counter1 ;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.counter1 = getIterationRuntimeContext().getIntCounter("merged");
            }

            @Override
            public Object map(Event event) throws Exception {
                counter1.add(1);
                return null;
            }


        });

Now, this is how we will get the results. Here JobExecutionResult will hold the execution results which can be queried later

JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");

This is how we get the # of merged events

number_of_merged_events = (Integer) jobExecutionResult.getAllAccumulatorResults().get("merged");

You can also do System.out.println(); inside the map to see the values on the console.

Please let me know if you have any further questions

Upvotes: 1

Related Questions