Reputation: 116
Is it possible to access the current iteration index when performing iterations in Apache Flink 1.3.2 and Scala 2.10?
val initialData: DataSet[(ItemSet[T], Int)]
initialData.iterate(maxIterations) {
current: DataSet[(ItemSet[T], Int)] =>
val currentIteration = ??? // Is this accessible somehow?
// ...
current
}
Based on my search so far, the short answer is "No" and the slightly longer answer is that it may be possible to hack around this using Flink's raw state.
Upvotes: 2
Views: 272
Reputation: 2816
Solution # 1: One way is to write a system.out.println()
inside this iterator and print values to console, or write those values to CSV as, but there will be some overhead in these cases.
Solution # 2: Another way is to use Counter
inside a map function which will increment the values and then use JobExecutionResult
to get this value
Here is how I can be done in Java
Let's say I am joining 2 streams and I want to count how many events are getting merged in total.
DataStream<Event> mergedStream = stream1.union(stream2);
To do this I will write a map
function which will be called for each merged event and perform an increment using countername.add(value_to_increment)
.
Also, we have given it a name merged
, which can be used at the end to fetch the results.
mergedStream.map(new RichMapFunction<Event, Object>() {
IntCounter counter1 ;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter1 = getIterationRuntimeContext().getIntCounter("merged");
}
@Override
public Object map(Event event) throws Exception {
counter1.add(1);
return null;
}
});
Now, this is how we will get the results. Here JobExecutionResult
will hold the execution results which can be queried later
JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");
This is how we get the # of merged events
number_of_merged_events = (Integer) jobExecutionResult.getAllAccumulatorResults().get("merged");
You can also do System.out.println();
inside the map to see the values on the console.
Please let me know if you have any further questions
Upvotes: 1