shakeel
shakeel

Reputation: 901

Get Latest value from Kafka

I have a Kafka topic called A.

format of data in topic A is :

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

Now in consumer side i want to get only latest data of one hour window means every one hour i need to get latest value from topic based on created_at

My expected output is :

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

I think this can be solve by ksql but i m not sure. Please help me.

Thank in advance.

Upvotes: 3

Views: 2246

Answers (1)

Hojjat
Hojjat

Reputation: 684

Yes, you can use KSQL for this. Try the following:

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAR) 
WITH (kafka_topic = 'topic_name', value_format = 'JSON');
CREATE TABLE maxRow AS 
SELECT 
    id, 
    name, 
    max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) 
AS created_at 
FROM s1 
WINDOW TUMBLING (size 1 hour) 
GROUP BY id, name;

The result will have the created_at time in Linux timestamp format. You can change it into your desired format using TIMESTAMPTOSTRING UDF in a new query.

Please let me know if you find any issues.

Upvotes: 4

Related Questions