Reputation: 1
I am facing issues while trying to achieve deduplication using Flink SQL. When constructing and insert data into another table backing a stream, I am getting duplicate records. To over come that I am trying deduplication but facing issue.
When constructing and insert data into another table backing a stream, I am getting duplicate records. To over come that I am trying deduplication but facing this error - ** TableException: Table sink 'hive.default.consolidated_notifications' doesn't support consuming update and delete changes which is produced by node**
%flink.ssql
INSERT INTO consolidated_notifications
SELECT
device_id,
notification_text,
window_start,
window_end
FROM (
SELECT
i.device_id,
CASE
WHEN component = 'grill' THEN notification_text
WHEN component IN ('probe_a', 'probe_a_volt', 'probe_a_ambient') THEN 'Probe A: ' || notification_text
WHEN component IN ('probe_b', 'probe_b_volt', 'probe_b_ambient') THEN 'Probe B: ' || notification_text
WHEN component IN ('probe_c', 'probe_c_volt', 'probe_c_ambient') THEN 'Probe C: ' || notification_text
WHEN component IN ('probe_d', 'probe_d_volt', 'probe_d_ambient') THEN 'Probe D: ' || notification_text
END AS notification_text,
i.event_time AS window_start,
i.event_time + INTERVAL '1' MINUTE AS window_end,
ROW_NUMBER() OVER (
PARTITION BY
i.device_id,
notification_text,
TUMBLE(i.event_time, INTERVAL '1' MINUTE)
ORDER BY i.event_time
) AS rn
FROM IoTDataStream i
JOIN MobileAppStream m ON i.device_id = m.id
CROSS JOIN UNNEST(ARRAY[
ROW('grill',
CASE
WHEN i.temperature < i.set_point THEN 'Grill preheating. Current: ' || CAST(i.temperature AS STRING) || '°F.'
ELSE 'Grill temperature deviated. Current: ' || CAST(i.temperature AS STRING) || '°F.'
END),
ROW('probe_a',
CASE
WHEN i.probe_a < m.probe_a_threshold THEN 'Probe A approaching target. Current: ' || CAST(i.probe_a AS STRING) || '°F.'
ELSE 'Probe A deviated. Current: ' || CAST(i.probe_a AS STRING) || '°F.'
END),
-- Similar rows for other probes
]) AS t(component, notification_text)
WHERE notification_text IS NOT NULL
AND i.event_time IS NOT NULL
)
WHERE rn = 1;
any ideas to overcome this issue?
Upvotes: 0
Views: 36