Reputation: 659
I am using community edition of confluent Platform version 5.4.1. I did not find any CLI command to print the KSQL Server version but when I enter KSQL what I get to see can be found in the attached screenshot.
I have a geofence table -
CREATE TABLE GEOFENCE (GEOFENCEID INT,
FLEETID VARCHAR,
GEOFENCECOORDINATES VARCHAR)
WITH (KAFKA_TOPIC='MONGODB-GEOFENCE',
VALUE_FORMAT='JSON',
KEY= 'GEOFENCEID');
The data is coming to Geofence KSQL table from Kafka MongoDB source connector whenever an insert or update operation is performed on the geofence MongoDB collection from a web application supported by a REST API. The idea behind making geofence a table is that since tables are mutable it would hold the updated geofence information and since the insert or update operation will not be very frequent and whenever there are changes in the Geofence MongoDB collection they will get updated on the Geofence KSQL table since the key here is GeofenceId.
I have a live stream of vehicle position -
CREATE STREAM VEHICLE_POSITION (VEHICLEID INT,
FLEETID VARCHAR,
LATITUDE DOUBLE,
LONGITUDE DOUBLE)
WITH (KAFKA_TOPIC='VEHICLE_POSITION',
VALUE_FORMAT='JSON')
I want to join table and stream like this -
CREATE STREAM VEHICLE_DISTANCE_FROM_GEOFENCE AS
SELECT GF.GEOFENCEID,
GF.FLEETID,
VP.VEHICLEID,
GEOFENCE_UDF(GF.GEOFENCECOORDINATES, VP.LATITUDE, VP.LONGITUDE)
FROM GEOFENCE GF
LEFT JOIN VEHICLE_POSITION VP
ON GF.FLEETID = VP.FLEETID;
But KSQL will not allow me to do because I am performing join on FLEETID
which is a non row key column.Though this would have been possible in SQL but how do I achieve this in KSQL?
Note: According to my application's business logic Fleet Id is used to combine Geofences and Vehicles belonging to a fleet.
Sample data for table -
INSERT INTO GEOFENCE
(GEOFENCEID INT, FLEETID VARCHAR, GEOFENCECOORDINATES VARCHAR)
VALUES (10, 123abc, 52.4497_13.3096);
Sample data for stream -
INSERT INTO VEHICLE_POSITION
(VEHICLEID INT, FLEETID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE)
VALUES (1289, 125abc, 57.7774, 12.7811):
Upvotes: 4
Views: 1937
Reputation: 1893
To solve your problem what you need is a table of FENCEID to GEOFENCECOORDINATES. You could use such a table to join to your VEHICLE_POSITION stream to get the result you need.
So, how do you get a table of FENCEID to GEOFENCECOORDINATES?
The simple answer is that you can't with your current table definition! You declare the table as having only the GEOFENCEID
as the primary key. Yet a fleetId can have many fences. To be able to mode this, both GEOFENCEID
and FENCEID
would need to be part of the primary key of the table.
Consider the example:
INSERT INTO GEOFENCE VALUES (10, 'fleet-1', 'coords-1');
INSERT INTO GEOFENCE VALUES (10, 'fleet-2', 'coords-2');
Are running these two inserts the table would contain only a single row, with key 10
and value 'fleet-2', 'coords-2'
.
Even if we could somehow capture the above information in a table, consider what happens if there is a tombstone in the topic, because the first row had been deleted from the source Mongo table. A tombstone is the key, (10
), and a null
value. ksqlDB would then remove the row from its table with key 10
, leaving an empty table.
This is the crux of your problem!
First, you'll need to configure the source connector to get both the fence id and fleet id into the key of the messages.
Next, you'll need to access this in ksqlDB. Unfortunately, ksqlDB, as of version 0.10.0 / CP 6.0.0 doesn't support multiple key columns, though this is being worked on soon.
In the meantime, if you key is a JSON document containing the two key fields, e.g.
{
"GEOFENCEID": 10,
"FLEETID": "fleet-1"
}
Then you can import it into ksqlDB as a STRING:
-- 5.4.1 syntax:
-- ROWKEY will contain the JSON document, containing GEOFENCEID and FLEETID
CREATE TABLE GEOFENCE (
GEOFENCECOORDINATES VARCHAR
)
WITH (
KAFKA_TOPIC='MONGODB-GEOFENCE',
VALUE_FORMAT='JSON'
);
-- 6.0.0 syntax:
CREATE TABLE GEOFENCE (
JSONKEY STRING PRIMARY KEY,
GEOFENCECOORDINATES VARCHAR
)
WITH (
KAFKA_TOPIC='MONGODB-GEOFENCE',
VALUE_FORMAT='JSON'
);
With the table now correctly defined you can use EXTRACTJSONFIELD
to access the data in the JSON key and collect all the fence coordinates using COLLECT_SET
. I'm not 100% sure this would on 5.4.1, (see how you get on), but will on 6.0.0.
-- 6.0.0 syntax
CREATE TABLE FLEET_COORDS AS
SELECT
EXTRACTJSONFIELD(JSONKEY, '$.FLEETID') AS FLEETID,
COLLECT_SET(GEOFENCECOORDINATES)
FROM GEOFENCE
GROUP BY EXTRACTJSONFIELD(JSONKEY, '$.FLEETID');
This will give you a table of fleetId to a set of fence coordinates. You can use this to join to your vehicle position stream. Of course, your GEOFENCE_UDF
udf will need to accept an ARRAY<STRING>
for the fence coordinates, as there may be many.
Good luck!
Upvotes: 1