Reputation: 6342
I have two streams that want to do interval join, the event type are case classes defined as follows, the tradeDate's type is java.sql.Timestamp
case class Stock(id: String, tradeDate: Timestamp, price: Double)
case class StockNameChanging(id: String, name: String, tradeDate: Timestamp)
When I run the following application, an exception is thrown as follows, I have no idea what it is talking about and how to fix the problem.
Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
org.apache.flink.table.api.TableException: Found more than one rowtime field: [rt1, rt2] in the table that should be converted to a DataStream.
Please select the rowtime field that should be used as event-time timestamp for the DataStream by casting all other fields to TIMESTAMP.
The code is:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Stock stream
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000))
//StockNameChanging stream
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"tradeDate".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"tradeDate".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, s1.rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
Upvotes: 0
Views: 1004
Reputation: 43697
Whenever Flink does event-time processing, each event needs an event-time timestamp. When using Flink SQL, if you want to do event-time processing, each row must have a rowtime attribute, and only one. However, the table created by your query has two event-time attributes, s1.rt1 and s2.rt2. The Flink SQL runtime is complaining because it cannot assign unique timestamps to the rows in this result table.
Since you're not doing any further event-time-based processing in this pipeline, you don't really need those columns to be treated as rowtime columns, so you can pick either or both of them to CAST as timestamps. I believe something like this will work:
SELECT
s1.id, s2.name, s1.price, CAST(s1.rt1 AS TIMESTAMP) AS t1, CAST(s2.rt2 AS TIMESTAMP) AS t2
FROM
s1 join s2
...
Upvotes: 4