Reputation: 1
I attempted to do something similar as shown in below example provide in latest release Doc :
PFB query that will be sent as a kafka sink output table:
SELECT cdr.org_id orgid, cdr.cluster_id clusterid, cdr.c_id cid,
MIN(scalarGradeFunc(cmr.metrics, cdr.duration, cmr.duration)) grade
FROM (SELECT * FROM TABLE(TUMBLE(TABLE cdrTable, DESCRIPTOR(proctime), INTERVAL '1' MINUTES))) cdr
LEFT JOIN
(SELECT * FROM TABLE(TUMBLE(TABLE cmrTable, DESCRIPTOR(proctime), INTERVAL '1' MINUTES))) cmr
ON (cdr.org_id = cmr.org_id AND cdr.cluster_id = cmr.cluster_id AND cdr.c_id = cmr.c_id )
AND cdr.window_start = cmr.window_start AND cdr.window_end = cmr.window_end
GROUP BY cdr.org_id, cdr.cluster_id, cdr.c_id;
Trying to use window tvf to join two tables as window join and I have one min agg function in select, So I have used group by for aggregation while trying it out getting below error :
'default_catalog.default_database.outputCdrTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[orgid, ...])
one more approach tried(Group Window Aggregation) but that seems to be deprecated as per release note : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
Please help me on this !!!
Upvotes: 0
Views: 402
Reputation: 1
It is resolved now used upsert kafka with primary key for sink
Upvotes: 0