Bruce REN
Bruce REN

Reputation: 25

Apache Flink: How to get timestamp of events in ingestion time mode?

I am wondering is it possible to obtain the timestamp of record by using Flink's ingestion time mode. Considering the following flink code example (https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala),

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)

val joined = joinStreams(grades, salaries, windowSize)

...
case class Grade(name: String, level: Int) 
case class Salary(name: String, salary: Int)

By default, neither grade nor salary contains timestamp field. However, since Flink allows to use "ingestionTime" to assign the wall clock timestamp to the records in data stream, is it possible to obtain such timestamp at runtime? For example, here is what I am trying to do:

val oldDatastream = env.addSource...  // Using ingestion time
val newDatastream = oldDatastream.map{record =>   
    val ts = getRecordTimestamp(record)
    // do some thing with ts
    }

Thanks for any help.

Upvotes: 1

Views: 3245

Answers (1)

Alex
Alex

Reputation: 859

Use ProcessFunction wich gives you a Context, that you can use to get the element's timestamp (whether its ingestion, processing or event time).

Upvotes: 5

Related Questions