Reputation: 6332
I am using Flink 1.11 and I have following test case to try out event time based interval join.
The data for the two streams are defined as follows:
object JoinStockInterval {
//the stocks data,
//ts is the implicit method that converts the time string to timestamp
val stocks = Seq(
Stock("id1", "2020-09-16 20:50:15".ts, 1),
Stock("id1", "2020-09-16 20:50:12".ts, 2),
Stock("id1", "2020-09-16 20:50:18".ts, 4),
Stock("id1", "2020-09-16 20:50:11".ts, 3),
Stock("id1", "2020-09-16 20:50:11".ts, 10),
Stock("id1", "2020-09-16 20:50:13".ts, 5),
Stock("id1", "2020-09-16 20:50:20".ts, 6),
Stock("id1", "2020-09-16 20:50:14".ts, 7),
Stock("id1", "2020-09-16 20:50:22".ts, 8),
Stock("id1", "2020-09-16 20:50:40".ts, 9),
Stock("id1", "2020-09-16 20:50:15".ts, 100)
)
//Mock that the stock name is changing over time
val stockNameChangings = Seq(
StockNameChanging("id1", "Stock1", "2020-09-16 20:50:16".ts),
StockNameChanging("id1", "Stock101", "2020-09-16 20:50:20".ts),
StockNameChanging("id1", "Stock4", "2020-09-16 20:50:17".ts),
StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts),
StockNameChanging("id1", "Stock5", "2020-09-16 20:50:17".ts),
StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts),
StockNameChanging("id1", "Stock6", "2020-09-16 20:50:23".ts)
)
}
The test case is defined as follows,each allows for 4 seconds lateness:
test("test interval join inner 2 works") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000)) //allow 4 secs lateness
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000)) //allow 4 secs lateness
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"trade_date".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"trade_date".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, cast (s1.rt1 as timestamp) as rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
}
The joining result is as follows:
id1,Stock1,1.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock1,4.0,2020-09-16T12:50:18,2020-09-16T12:50:16
id1,Stock101,4.0,2020-09-16T12:50:18,2020-09-16T12:50:20
id1,Stock4,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock4,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock5,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock101,6.0,2020-09-16T12:50:20,2020-09-16T12:50:20
id1,Stock7,6.0,2020-09-16T12:50:20,2020-09-16T12:50:21
id1,Stock501,6.0,2020-09-16T12:50:20,2020-09-16T12:50:22
id1,Stock1,7.0,2020-09-16T12:50:14,2020-09-16T12:50:16
id1,Stock101,8.0,2020-09-16T12:50:22,2020-09-16T12:50:20
id1,Stock501,8.0,2020-09-16T12:50:22,2020-09-16T12:50:22
id1,Stock7,8.0,2020-09-16T12:50:22,2020-09-16T12:50:21
id1,Stock6,8.0,2020-09-16T12:50:22,2020-09-16T12:50:23
id1,Stock1,100.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock4,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
It is strange for the last record in the result above, it is from Stock("id1", "2020-09-16 20:50:15".ts, 100)
in the stocks streams, but this record has been late in the stocks stream.
See the following two records in the stocks stream, I would ask why this record is not dropped but joined successfully with the other stream (name changing stream)
Stock("id1", "2020-09-16 20:50:40".ts, 9),
Stock("id1", "2020-09-16 20:50:15".ts, 100)
The watermark strategy is using AssignerWithPunctuatedWatermarks
Upvotes: 1
Views: 288
Reputation: 43454
The record you are wondering about
Stock("id1", "2020-09-16 20:50:15".ts, 100)
isn't late from the point of view of the join.
The reason why has to do with how watermarks are propagated in situations where an operator has multiple inputs (like this interval join). The current watermark at the join operator is always going to be the smallest of the watermarks received so far from all of the input channels.
So until the join has processed this record
StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts)
the watermark at the join is determined by this record
StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts)
and so the watermark still falls within the interval defined for the join.
Watermarks work this way because they represent an assertion that the stream can now be considered complete up to the timestamp of the watermark. And from the point of view of the join, it only has complete knowledge up to the watermark of whichever stream is the furthest behind.
Upvotes: 2