Vishal Goel
Vishal Goel

Reputation: 101

Possibility of saving partial outputs from bulk iteration in Flink Dataset?

I am doing an iterative computation using flink dataset API.
But the result of each iteration is a part of my complete solution.
(If more details required: I am computing lattice nodes level-wise starting from top towards bottom in each iteration, see Formal Concept Analysis)

If I use flink dataset API with bulk iteration without saving my result, the code will look like below:

val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    (result,result)
}
end.count()

But, if I try to write partial results within iteration (_.writeAsText()) or any action, I will get error:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

The alternative without bulk iteration seems to be below:

var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    count = start.count()
    all = all + count
}
println("total nodes: " + all)

But this approach is exceptionally slow on smallest input data, iteration version takes <30 seconds and loop version takes >3 minutes.
I guess flink is not able to create optimal plan to execute the loop.

Any workaround I should try? Is some modification to flink is possible to be able to save partial results on hadoop etc.?

Upvotes: 2

Views: 373

Answers (1)

vasia
vasia

Reputation: 136

Unfortunately, it is not currently possible to output intermediate results from a bulk iteration. You can only output the final result at the end of the iteration.

Also, as you correctly noticed, Flink cannot efficiently unroll a while-loop or for-loop, so that won't work either.

If your intermediate results are not that big, one thing you can try is appending your intermediate results in the partial solution and then output everything in the end of the iteration. A similar approach is implemented in the TransitiveClosureNaive example, where paths discovered in an iteration are accumulated in the next partial solution.

Upvotes: 3

Related Questions