Reputation: 2776
To test out stream processing and Flink, I have given myself a seemingly simple problem. My Data stream consists of x
and y
coordinates for a particle along with time t
at which the position was recorded. My objective is to annotate this data with velocity of the particular particle. So the stream might look some thing like this.
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
Now there is no guaranty that the events would arrive in order i.e. 1612103771213 p2 -0.1 -0.1
might arrive say 10ms
before 1612103771212 p2 0.0 0.0
.
For simplicity, it can be assumed that any late data will arrive within 100ms
of the early data.
I will admit that I am new to stream processing and Flink, so this might be a stupid question to ask with an obvious answer, but I am currently stumped as to how to go about achieving my objective here.
EDIT
Following David's answer I tried using Flink Table API to sort the Datastream, using nc -lk 9999
for text socket stream. The issue is that nothing gets printed to the console until I close the text socket stream. Here is the scala code I wrote -
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}
Upvotes: 4
Views: 3829
Reputation: 43454
In general, watermarks in combination with event-time timers are the solution to the problems posed by out-of-order event streams. The section of the official Flink training that covers Event Time and Watermarks explains how this works.
At a higher level it is sometimes easier to use something like Flink's CEP library, or Flink SQL, because they make it very easy to sort a stream by time, thus removing all of the out-of-orderness. For example, see How to sort a stream by event time using Flink SQL for an example of a Flink DataStream program that uses Flink SQL to sort a stream by event time.
In your case, a fairly simple MATCH_RECOGNIZE query would do what you're looking for. That might look something like this,
SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,
b.particleId,
velocity(a, b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,
b AS TRUE
)
where velocity(a, b) is a user-defined function that computes the velocity, given two sequential events (a and b) for the same particle.
Upvotes: 3
Reputation: 7180
One way of doing this in Flink might be to use a KeyedProcessFunction, i.e. a function that can:
So it would go something like this:
1612103771212
you decide to consider you're sure to have received all data until 1612103771112
.keyBy()
your stream, keying by particle id. This means that the logic of next operators in your Flink application can now be expressed in terms of a sequence of events of just one particle, and each particle is processed in this manner in parallel.Something like this:
yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
ProcessFunction
of YourProcessFunction
(i.e. during the open
method), initialize a ListState
where you can safely store stuff.processElement
method, just add it to the listState
and register a timer trigger in, say, 100msonTimer()
method triggers, say at time t
, look at all elements in the listState
that have a time < t - 100
and, if you have at least two of them, sort them, remove them from the state, apply the speed calculation and annotation logic you described, and emit the result downstream.You'll find an example in the official Flink training that is using this kind of logic for the duration of taxi rides, which has lot's of similarities with your use case. Have also a look at the various Readme.md files of that repo for more details.
Upvotes: 3