lgbo
lgbo

Reputation: 221

Flink SQL : run out of memory for joining tables

I have a MySql table updated frequently. I want to take a snapshot for each id which are updated in the past 20 seconds and write the value into a redis. I use the binlog as streaming input and transform the datastream into a Flink table. I run the following sql.

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
   SELECT id, MAX(ts)
   FROM my_tbl
   GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)

As I know tables join would make excessive state size, I set the StreamQueryConfig as follow

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));

I run the task for one day and get the out of memory error. How can I solve this problem?

Upvotes: 0

Views: 979

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

You can solve this also with a time-windowed join instead of a regular join with configured idle state retention time.

The following query should do the trick.

SELECT id, ts, val
FROM my_tbl m1,
     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

The windowed join predicate (BETWEEN) ensures that the state is automatically cleaned up. Since you are using processing time which is not exact, I've added 5 seconds slack time.

Upvotes: 1

Related Questions