Reputation: 221
I have a MySql table updated frequently. I want to take a snapshot for each id which are updated in the past 20 seconds and write the value into a redis. I use the binlog as streaming input and transform the datastream into a Flink table. I run the following sql.
SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
SELECT id, MAX(ts)
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)
As I know tables join would make excessive state size, I set the StreamQueryConfig as follow
qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));
I run the task for one day and get the out of memory error. How can I solve this problem?
Upvotes: 0
Views: 979
Reputation: 18987
You can solve this also with a time-windowed join instead of a regular join with configured idle state retention time.
The following query should do the trick.
SELECT id, ts, val
FROM my_tbl m1,
(SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime
The windowed join predicate (BETWEEN
) ensures that the state is automatically cleaned up. Since you are using processing time which is not exact, I've added 5 seconds slack time.
Upvotes: 1