Reputation: 1
I have a topic in Kafka called event1 which has columns like 'id', 't','lookup', 'version'... t is the timestamp value for event, I have created a stream from it using KSQL, I need to get all the lookup values for a given id for the last 5 minutes? What is the appropriate way to do this? I have tried creating a stream windowing it for 5 minutes but it none of my approaches seem to work.
Stream:
CREATE stream event1_stream (id varchar, t bigint, cVersion varchar, cdVersion varchar, lookup varchar, column1 varchar, column2 varchar, column3 varchar, column4 varchar) WITH (kafka_topic='event1', value_format='JSON');
Table:
CREATE TABLE event1_lookup_table
AS SELECT CID,T,LOOKUP,count(*)
FROM EVENT1_STREAM
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 30 SECONDS)
GROUP BY CID,T,LOOKUP
EMIT CHANGES;
I only need the last x minutes of data from this topic in my application. If anyone have an idea how to do it? Ultimately what I want to do is to be able to query last 5 minutes of lookup data for a specific id using ksql. Thanks for your help.
Upvotes: 0
Views: 989
Reputation: 41
Instead of using time windows, use the ROWTIME keyword which contains the timestamp of the event inserted by the KAFKA topic (which you can also modify https://docs.ksqldb.io/en/latest/how-to- guides/use-a-custom-timestamp-column/).
The UNIX_TIMESTAMP function is then used (if no timestamp is passed as a parameter https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#unix_timestamp) to get the current time, then you could subtract time (in milliseconds) to return the records you require.
SELECT * FROM event1_stream WHERE ROWTIME>UNIX_TIMESTAMP()-60000;
Where 60000 are the milliseconds that correspond to one minute, therefore it would return all the events that occurred one minute before the query.
Upvotes: 4