Jeppe KS
Jeppe KS

Reputation: 95

Flink Scala - Comparison method violates its general contract

I am writing a project in Flink that involves streaming a set of query points over batched data and performing a full sequential scan to find the nearest neighbors. What should be a simple sort operation on a single Float value throws a violation of the general contract error. The main method is defined as:

object StreamingDeCP{
  var points: Vector[Point] = _

  def main(args: Array[String]): Unit = {
    val queryPointsVec: Vector[Point] = ... // Read from file
    val pointsVec: Vector[Point] = ...      // Read from file

    val streamEnv: StreamExecutionEnvironment = 
                   StreamExecutionEnvironment.getExecutionEnvironment
    val queryPoints = streamEnv.fromCollection(queryPointsVec)

    points = pointsVec
    queryPoints.map(new StreamingSequentialScan)

    streamEnv.execute("StreamingDeCP")
  }

  final class StreamingSequentialScan 
                    extends MapFunction[Point, (Point, Vector[Point])] {

    def map(queryPoint: Point): (Point, Vector[Point]) = {
      val nn = points
                .map{ _.eucDist(queryPoint) }
                .sorted

      (queryPoint, nn)
    }
  }
}

The Point class and companion object are:

case class Point(pointID: Long,
                 descriptor: Vector[Float]) extends Serializable {
  var distance: Float = Float.MaxValue

  def eucDist(that: Point): Point = {
    // Simple arithmetic to calculate and set the distance variable
  }
}

object Point{
  implicit def orderByDistance[A <: Point]: Ordering[A] =
    Ordering.by(_.distance)
}

Here are some notes about the things I have tried, in order to pinpoint the cause:

I also noted, that executing the same code does not always reproduce the error reliably. I am reading the Vector[Points] in a fully deterministic way, so the only possibly cause for this behavior must be the Flink scheduler or some stateful computation in the sorting method.

Other posts on the same topic seem to involve a missed scenario in the custom comparator, but this should be a simple sort operation on a single Float value, so I have no idea about what could cause the issue.

Upvotes: 2

Views: 169

Answers (1)

Andrey Tyukin
Andrey Tyukin

Reputation: 44957

I'm not familiar with Flink, but I don't have any reason to assume that it will execute every embarrassingly parallel MapFunction task in a sequential single-threaded manner.

Since your Point contains vars, and those vars are mutated in the map method of the MapFunction, the code must fail with a "Comparison method violates its general contract"-exception whenever the MapFunction is executed with parallelism != 1.

To avoid any side effects inside the map function, you could modify the code as follows:

  • Remove any vars from main, make points an immutable val.
  • Remove any kind of vars from Point
  • Implement the method

    def eucDist(other: Point): Double
    

    that simply computes the distance to another point (without mutating anything).

  • Use sortBy:

    val nn = points.sortBy(_.eucDist(queryPoint))
    

Alternatively, if you want to avoid recomputing the Euclidean distance multiple times during sorting, precompute the distances once, sort, and then throw the distances away:

val nn = points.map(p => (p, p.eucDist(queryPoint))).sortBy(_._2).map(_._1)

Upvotes: 1

Related Questions