Bayman2617
Bayman2617

Reputation: 1

Deduplication in Flink sql notebook

I am facing issues while trying to achieve deduplication using Flink SQL. When constructing and insert data into another table backing a stream, I am getting duplicate records. To over come that I am trying deduplication but facing issue.

When constructing and insert data into another table backing a stream, I am getting duplicate records. To over come that I am trying deduplication but facing this error - ** TableException: Table sink 'hive.default.consolidated_notifications' doesn't support consuming update and delete changes which is produced by node**

%flink.ssql
INSERT INTO consolidated_notifications
 SELECT 
   device_id,
   notification_text,
   window_start,
   window_end
 FROM (
   SELECT 
       i.device_id,
       CASE 
           WHEN component = 'grill' THEN notification_text
           WHEN component IN ('probe_a', 'probe_a_volt', 'probe_a_ambient') THEN 'Probe A: ' || notification_text
           WHEN component IN ('probe_b', 'probe_b_volt', 'probe_b_ambient') THEN 'Probe B: ' || notification_text
           WHEN component IN ('probe_c', 'probe_c_volt', 'probe_c_ambient') THEN 'Probe C: ' || notification_text
           WHEN component IN ('probe_d', 'probe_d_volt', 'probe_d_ambient') THEN 'Probe D: ' || notification_text
       END AS notification_text,
       i.event_time AS window_start,
       i.event_time + INTERVAL '1' MINUTE AS window_end,
       ROW_NUMBER() OVER (
           PARTITION BY 
               i.device_id, 
               notification_text,
               TUMBLE(i.event_time, INTERVAL '1' MINUTE)
           ORDER BY i.event_time
       ) AS rn
   FROM IoTDataStream i
   JOIN MobileAppStream m ON i.device_id = m.id
   CROSS JOIN UNNEST(ARRAY[
       ROW('grill', 
           CASE 
               WHEN i.temperature < i.set_point THEN 'Grill preheating. Current: ' || CAST(i.temperature AS STRING) || '°F.'
               ELSE 'Grill temperature deviated. Current: ' || CAST(i.temperature AS STRING) || '°F.'
           END),
       ROW('probe_a', 
           CASE 
               WHEN i.probe_a < m.probe_a_threshold THEN 'Probe A approaching target. Current: ' || CAST(i.probe_a AS STRING) || '°F.'
               ELSE 'Probe A deviated. Current: ' || CAST(i.probe_a AS STRING) || '°F.'
           END),
       -- Similar rows for other probes
   ]) AS t(component, notification_text)
   WHERE notification_text IS NOT NULL
   AND i.event_time IS NOT NULL
 ) 
 WHERE rn = 1;

any ideas to overcome this issue?

Upvotes: 0

Views: 36

Answers (0)

Related Questions