Reputation: 25
I am wondering is it possible to obtain the timestamp of record by using Flink's ingestion time mode. Considering the following flink code example (https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala),
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)
val joined = joinStreams(grades, salaries, windowSize)
...
case class Grade(name: String, level: Int)
case class Salary(name: String, salary: Int)
By default, neither grade nor salary contains timestamp field. However, since Flink allows to use "ingestionTime" to assign the wall clock timestamp to the records in data stream, is it possible to obtain such timestamp at runtime? For example, here is what I am trying to do:
val oldDatastream = env.addSource... // Using ingestion time
val newDatastream = oldDatastream.map{record =>
val ts = getRecordTimestamp(record)
// do some thing with ts
}
Thanks for any help.
Upvotes: 1
Views: 3245
Reputation: 859
Use ProcessFunction
wich gives you a Context
, that you can use to get the element's timestamp (whether its ingestion, processing or event time).
Upvotes: 5