Tom
Tom

Reputation: 6332

Late event seems not being dropped when doing interval join among two streams

I am using Flink 1.11 and I have following test case to try out event time based interval join.

The data for the two streams are defined as follows:

object JoinStockInterval {
  //the stocks data,
  //ts is the implicit method that converts the time string to timestamp
  val stocks = Seq(
    Stock("id1", "2020-09-16 20:50:15".ts, 1),
    Stock("id1", "2020-09-16 20:50:12".ts, 2),
    Stock("id1", "2020-09-16 20:50:18".ts, 4),
    Stock("id1", "2020-09-16 20:50:11".ts, 3),
    Stock("id1", "2020-09-16 20:50:11".ts, 10),
    Stock("id1", "2020-09-16 20:50:13".ts, 5),
    Stock("id1", "2020-09-16 20:50:20".ts, 6),
    Stock("id1", "2020-09-16 20:50:14".ts, 7),
    Stock("id1", "2020-09-16 20:50:22".ts, 8),
    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)
  )

  //Mock that the stock name is changing over time
  val stockNameChangings = Seq(
    StockNameChanging("id1", "Stock1", "2020-09-16 20:50:16".ts),
    StockNameChanging("id1", "Stock101", "2020-09-16 20:50:20".ts),
    StockNameChanging("id1", "Stock4", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts),
    StockNameChanging("id1", "Stock5", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts),
    StockNameChanging("id1", "Stock6", "2020-09-16 20:50:23".ts)

  )

}

The test case is defined as follows,each allows for 4 seconds lateness:

 test("test interval join inner 2 works") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000)) //allow 4 secs lateness
    val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000)) //allow 4 secs lateness
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("s1", ds1, $"id", $"price", $"trade_date".rowtime() as "rt1")
    tenv.createTemporaryView("s2", ds2, $"id", $"name", $"trade_date".rowtime() as "rt2")
    tenv.from("s1").printSchema()
    tenv.from("s2").printSchema()
    val sql =
      """
      select s1.id, s2.name, s1.price, cast (s1.rt1 as timestamp) as rt1, s2.rt2
      from s1 join s2
      on s1.id = s2.id
      where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toAppendStream[Row].print()
    env.execute()
  }

The joining result is as follows:

id1,Stock1,1.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock1,4.0,2020-09-16T12:50:18,2020-09-16T12:50:16
id1,Stock101,4.0,2020-09-16T12:50:18,2020-09-16T12:50:20
id1,Stock4,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock4,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock5,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock101,6.0,2020-09-16T12:50:20,2020-09-16T12:50:20
id1,Stock7,6.0,2020-09-16T12:50:20,2020-09-16T12:50:21
id1,Stock501,6.0,2020-09-16T12:50:20,2020-09-16T12:50:22
id1,Stock1,7.0,2020-09-16T12:50:14,2020-09-16T12:50:16
id1,Stock101,8.0,2020-09-16T12:50:22,2020-09-16T12:50:20
id1,Stock501,8.0,2020-09-16T12:50:22,2020-09-16T12:50:22
id1,Stock7,8.0,2020-09-16T12:50:22,2020-09-16T12:50:21
id1,Stock6,8.0,2020-09-16T12:50:22,2020-09-16T12:50:23
id1,Stock1,100.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock4,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17

It is strange for the last record in the result above, it is from Stock("id1", "2020-09-16 20:50:15".ts, 100) in the stocks streams, but this record has been late in the stocks stream. See the following two records in the stocks stream, I would ask why this record is not dropped but joined successfully with the other stream (name changing stream)

    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)

The watermark strategy is using AssignerWithPunctuatedWatermarks

Upvotes: 1

Views: 288

Answers (1)

David Anderson
David Anderson

Reputation: 43454

The record you are wondering about

Stock("id1", "2020-09-16 20:50:15".ts, 100)

isn't late from the point of view of the join.

The reason why has to do with how watermarks are propagated in situations where an operator has multiple inputs (like this interval join). The current watermark at the join operator is always going to be the smallest of the watermarks received so far from all of the input channels.

So until the join has processed this record

StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts)

the watermark at the join is determined by this record

StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts)

and so the watermark still falls within the interval defined for the join.

Watermarks work this way because they represent an assertion that the stream can now be considered complete up to the timestamp of the watermark. And from the point of view of the join, it only has complete knowledge up to the watermark of whichever stream is the furthest behind.

Upvotes: 2

Related Questions