Reputation: 682
I have two streams that I want to join like LEFT JOIN way. I want just enrich my left side stream from the right one. Let's say my left stream is a car_traffic
and right stream is a car_electronics
. The license_plate_number is a common field for both streams.
From car_electronics
I only keep license_plate_number and gps_mac_addr as @ Mac changes constantly but not every car equipped by GPS module. I'm filtering non NULL values then transforming stream to versioned table view.
The main idea is to have on the right side like a table-reference of cars with gps_mac_addr and enrich left side with all known mac@ keeping the values that don't have a match.
The streams moving with different speed.
My questions:
We using: Flink 1.14.6
Each stream sends around 1,6-2 Billion records / day
Data tables ex:
car_traffic
+------------------------+--------------------------+----------------------+
| license_plate_number | eventTime | ... |
+------------------------+--------------------------+----------------------+
| AA-123-BB | 2022-11-29 ... | ... |
| AA-456-CC | 2022-11-29 ... | ... |
| EE-935-JJ | 2022-11-29 ... | ... |
car_electronics
+----+----------------------+-------------------+--------------------------+
| op | license_plate_number | gps_mac_addr | eventTime |
+----+----------------------+-------------------+--------------------------+
| +I | AA-123-BB | AA | 2022-11-28 ... |
| -U | AA-123-BB | AA | 2022-11-29 ... |
| +U | AA-123-BB | FFFF0A0FBBC6 | 2022-11-29 ... |
| +I | AA-456-CC | FFFF0A0F00F0 | 2022-11-29 ... |
result I want
+------------------------+------------------------+------------------------+
| license_plate_number | gps_mac_addr | eventTime |
+------------------------+------------------------+------------------------+
| AA-123-BB | FFFF0A0FBBC6 | ... |
| AA-456-CC | FFFF0A0F00F0 | ... |
| EE-935-JJ | (NULL) | ... |
Upvotes: 1
Views: 300
Reputation: 39
If car_electronics
is a database table, maybe you can use the Flink CDC Project to catch the changelog as a Flink Source.
You can use the Datastream API or SQL to implement the demand.
For SQL:
CREATE TABLE car_electronics (
license_plate_number STRING ,
gps_mac_addr STRING ,
eventTime TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
......
);
SELECT *
FROM car_traffic c1
LEFT JOIN car_electronics c2 on c1.license_plate_number = c2.license_plate_number
Upvotes: 1