Reputation: 95
I am writing a project in Flink that involves streaming a set of query points over batched data and performing a full sequential scan to find the nearest neighbors. What should be a simple sort operation on a single Float value throws a violation of the general contract error. The main method is defined as:
object StreamingDeCP{
var points: Vector[Point] = _
def main(args: Array[String]): Unit = {
val queryPointsVec: Vector[Point] = ... // Read from file
val pointsVec: Vector[Point] = ... // Read from file
val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val queryPoints = streamEnv.fromCollection(queryPointsVec)
points = pointsVec
queryPoints.map(new StreamingSequentialScan)
streamEnv.execute("StreamingDeCP")
}
final class StreamingSequentialScan
extends MapFunction[Point, (Point, Vector[Point])] {
def map(queryPoint: Point): (Point, Vector[Point]) = {
val nn = points
.map{ _.eucDist(queryPoint) }
.sorted
(queryPoint, nn)
}
}
}
The Point
class and companion object are:
case class Point(pointID: Long,
descriptor: Vector[Float]) extends Serializable {
var distance: Float = Float.MaxValue
def eucDist(that: Point): Point = {
// Simple arithmetic to calculate and set the distance variable
}
}
object Point{
implicit def orderByDistance[A <: Point]: Ordering[A] =
Ordering.by(_.distance)
}
Here are some notes about the things I have tried, in order to pinpoint the cause:
distance
values are between Float.MaxValue and Float.MinValue and no negative-zero values existdistance
variables within the same sort operation (my use case allows for this, but I thought I would check it just in case)Point
instead of using implicitspointID
instead of distance
, which works but is useless for the context of this problem.I also noted, that executing the same code does not always reproduce the error reliably. I am reading the Vector[Points]
in a fully deterministic way, so the only possibly cause for this behavior must be the Flink scheduler or some stateful computation in the sorting method.
Other posts on the same topic seem to involve a missed scenario in the custom comparator, but this should be a simple sort operation on a single Float value, so I have no idea about what could cause the issue.
Upvotes: 2
Views: 169
Reputation: 44957
I'm not familiar with Flink, but I don't have any reason to assume that it will execute every embarrassingly parallel MapFunction
task in a sequential single-threaded manner.
Since your Point
contains var
s, and those var
s are mutated in the map
method of the MapFunction
, the code must fail with a "Comparison method violates its general contract"-exception whenever the MapFunction
is executed with parallelism != 1
.
To avoid any side effects inside the map
function, you could modify the code as follows:
var
s from main
, make points
an immutable val
.var
s from Point
Implement the method
def eucDist(other: Point): Double
that simply computes the distance to another point (without mutating anything).
Use sortBy
:
val nn = points.sortBy(_.eucDist(queryPoint))
Alternatively, if you want to avoid recomputing the Euclidean distance multiple times during sorting, precompute the distances once, sort, and then throw the distances away:
val nn = points.map(p => (p, p.eucDist(queryPoint))).sortBy(_._2).map(_._1)
Upvotes: 1