Reputation: 2057
I'm trying Flink and wrote the following example program:
object IFJob {
@SerialVersionUID(1L)
final class StringInputFormat extends GenericInputFormat[String] {
val N = 100
var i = 0L
override def reachedEnd(): Boolean = this.synchronized {
i >= N
}
override def nextRecord(ot: String): String = this.synchronized {
i += 1
return (i % 2) + ""
}
}
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = env.createInput(new StringInputFormat())
val map = text.map {
(_, 1)
}
// map.print()
val by = map.groupBy(0)
val aggregate: AggregateDataSet[(String, Int)] = by.aggregate(Aggregations.SUM, 1)
aggregate.print()
}
}
I am creating a StringInputFormat
once and read it in parallel (with a default parallelism = 8).
When I run the above program, the results vary between executions, i.e., they are not deterministic. Results are duplicated 1-8x times.
For example I get the following results:
// first run
(0,150)
(1,150)
// second run
(0,50)
(1,50)
// third run
(0,200)
(1,200)
The expected result would be
(0,400)
(1,400)
Because there the StringInputFormat
should generate 8 times 50 "0" and "1" records.
I even added synchronization to the input format, but it didn't help.
What am I missing in the Flink computation model?
Upvotes: 0
Views: 112
Reputation: 18987
The behavior you observe is the result of how Flink assigns work to an InputFormat
. This works as follows:
createInputSplits()
method is called which returns an array of InputSplit
. An InputSplit
is a chunk of data to read (or generate). The GenericInputFormat
creates one InputSplit
for each parallel task instance. In your case, it creates 8 InputSplit
objects and each InputSplit
should generate 50 "1"
and 50 "0"
records.DataSourceTask
are started on the workers (TaskManagers). Each DataSourceTask
has an own instance of the InputFormat
.DataSourceTask
requests an InputSplit
from the master and calls the open()
method of its InputFormat
with the InputSplit
. When the InputFormat
finished processing the InputSplit
, the DataSourceTask
requests a new from the master.In your case, each InputSplit
is very quickly processed. Hence, there is a race between DataSourceTasks requesting InputSplits for their InputFormats and some InputFormats processes more than one InputSplit
. Since an InputFormat
does not reset its internal state (i.e., set i = 0
) when is opens a new InputSplit
it will only generate data for the first InputSplit
it processes.
You can fix this by adding this method to the StringInputFormat
:
override def open(split: GenericInputSplit): Unit = {
super.open(split)
i = 0
}
Upvotes: 1