Reputation: 337
I have a streaming input, say stock price data (including multiple stocks), and I want to do a ranking by their price every 1 minutes. The ranking is based on all stocks' latest price and needs to sort all of them no matter if it was updated in the previous 1 minute or not. I tried to use ORDER BY in flink stream SQL.
I failed to implement my logic and I am confused about two parts:
Why can ORDER BY
only use a time attribute as primary and only support ASC
? How can I implement an order by an other type like price?
What does the below SQL (from the Flink document) mean? There is no window and there is no window so I assume the SQL will be executed immediately for each order come in, in that case, it looks meaningless to sort one element.
[Update]: When I read the code of ProcimeSortProcessFunction.scala, it seems that Flink sorts the elements received during the next one millisecond.
SELECT *
FROM Orders
ORDER BY orderTime
Finally, is there a way to implement my logic in SQL?
Upvotes: 0
Views: 2386
Reputation: 18987
ORDER BY
in streaming queries are difficult to compute because we don't want to update the whole result when we have to emit a result that would need to go to the beginning of the result table. Therefore, we only support ORDER BY time-attribute
if we can guarantee that the results have (roughly) increasing timestamps.
In the future (Flink 1.6 or later), we will also support some queries like ORDER BY x ASC LIMIT 10
, which will result in an updating table that contains the records with the 10 smallest x
values.
Anyway, you cannot (easily) compute a top-k ranking per minute using a GROUP BY
tumbling window. GROUP BY
queries aggregate the records of group (also window in case of GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE)
) into a single record. So there won't be multiple records per minute but just one.
If you'd like a query to compute top-10 on field a
per minute you would need a query similar to this one:
SELECT a, b, c
FROM (
SELECT
a, b, c,
RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank
FROM yourTable)
WHERE rank <= 10
However, such queries are not yet supported by Flink (version 1.4) because the time attribute is used in the PARTITION BY
clause and not the ORDER BY
clause of the OVER
window.
Upvotes: 1