Mehmet Aydogan
Mehmet Aydogan

Reputation: 1

How to get the last X minutes of events in a KSQL topic?

I have a topic in Kafka called event1 which has columns like 'id', 't','lookup', 'version'... t is the timestamp value for event, I have created a stream from it using KSQL, I need to get all the lookup values for a given id for the last 5 minutes? What is the appropriate way to do this? I have tried creating a stream windowing it for 5 minutes but it none of my approaches seem to work.

Stream:

CREATE stream event1_stream (id varchar, t bigint, cVersion varchar, cdVersion varchar, lookup varchar, column1 varchar, column2 varchar, column3 varchar, column4 varchar) WITH (kafka_topic='event1', value_format='JSON');

Table:

CREATE TABLE event1_lookup_table

AS SELECT CID,T,LOOKUP,count(*)

FROM EVENT1_STREAM

WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 30 SECONDS)

GROUP BY CID,T,LOOKUP

EMIT CHANGES;

I only need the last x minutes of data from this topic in my application. If anyone have an idea how to do it? Ultimately what I want to do is to be able to query last 5 minutes of lookup data for a specific id using ksql. Thanks for your help.

Upvotes: 0

Views: 989

Answers (1)

jdavidortega
jdavidortega

Reputation: 41

Instead of using time windows, use the ROWTIME keyword which contains the timestamp of the event inserted by the KAFKA topic (which you can also modify https://docs.ksqldb.io/en/latest/how-to- guides/use-a-custom-timestamp-column/).

The UNIX_TIMESTAMP function is then used (if no timestamp is passed as a parameter https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#unix_timestamp) to get the current time, then you could subtract time (in milliseconds) to return the records you require.

SELECT * FROM event1_stream WHERE ROWTIME>UNIX_TIMESTAMP()-60000;

Where 60000 are the milliseconds that correspond to one minute, therefore it would return all the events that occurred one minute before the query.

Upvotes: 4

Related Questions