mishka
mishka

Reputation: 2057

Race condition when generating data with a GenericInputFormat

I'm trying Flink and wrote the following example program:

object IFJob {

  @SerialVersionUID(1L)
  final class StringInputFormat extends GenericInputFormat[String] {

    val N = 100
    var i = 0L

    override def reachedEnd(): Boolean = this.synchronized {
      i >= N
    }

    override def nextRecord(ot: String): String = this.synchronized {
      i += 1
      return (i % 2) + ""
    }
  }

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text: DataSet[String] = env.createInput(new StringInputFormat())

    val map = text.map {
      (_, 1)
    }
    //        map.print()
    val by = map.groupBy(0)
    val aggregate: AggregateDataSet[(String, Int)] = by.aggregate(Aggregations.SUM, 1)
    aggregate.print()
  }
}

I am creating a StringInputFormat once and read it in parallel (with a default parallelism = 8). When I run the above program, the results vary between executions, i.e., they are not deterministic. Results are duplicated 1-8x times.

For example I get the following results:

// first run 
(0,150) 
(1,150)

// second run 
(0,50) 
(1,50)

// third run 
(0,200) 
(1,200)

The expected result would be

(0,400) 
(1,400)

Because there the StringInputFormat should generate 8 times 50 "0" and "1" records.

I even added synchronization to the input format, but it didn't help.

What am I missing in the Flink computation model?

Upvotes: 0

Views: 112

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

The behavior you observe is the result of how Flink assigns work to an InputFormat. This works as follows:

  1. On the master (JobManager), the createInputSplits() method is called which returns an array of InputSplit. An InputSplit is a chunk of data to read (or generate). The GenericInputFormat creates one InputSplit for each parallel task instance. In your case, it creates 8 InputSplit objects and each InputSplit should generate 50 "1" and 50 "0" records.
  2. The parallel instances of a DataSourceTask are started on the workers (TaskManagers). Each DataSourceTask has an own instance of the InputFormat.
  3. Once started, a DataSourceTask requests an InputSplit from the master and calls the open() method of its InputFormat with the InputSplit. When the InputFormat finished processing the InputSplit, the DataSourceTask requests a new from the master.

In your case, each InputSplit is very quickly processed. Hence, there is a race between DataSourceTasks requesting InputSplits for their InputFormats and some InputFormats processes more than one InputSplit. Since an InputFormat does not reset its internal state (i.e., set i = 0) when is opens a new InputSplit it will only generate data for the first InputSplit it processes.

You can fix this by adding this method to the StringInputFormat:

override def open(split: GenericInputSplit): Unit = {
  super.open(split)
  i = 0
}

Upvotes: 1

Related Questions