Reputation: 316
Say I have a topic with temperature forecast data, as follows:
2018-10-25,Melbourne,21
2018-10-26,Melbourne,17
2018-10-27,Melbourne,21
2018-10-25,Sydney,22
2018-10-26,Sydney,20
2018-10-27,Sydney,23
2018-10-26,Melbourne,18
2018-10-27,Melbourne,22
2018-10-26,Sydney,21
2018-10-27,Sydney,24
Each entry contains a date, a city, and a forecast temperature, and represents an update to the forecast for that city on that date. I can describe it as a KSQL stream like this:
CREATE STREAM forecasts_csv ( \
date VARCHAR, \
city VARCHAR, \
temperature INTEGER \
) WITH (kafka_topic='forecasts-csv', value_format='DELIMITED');
Now, I want a table that represents the current (i.e. the latest) forecast temperature for each city, as well as the min and max of that forecast over time. An example desired output is:
{ date='2018-10-27', city='Melbourne', latest=22, min=21, max=22 }
How can I achieve this?
I've managed to get the aggregates (min/max) as follows:
CREATE STREAM forecasts_keyed \
WITH (partitions=4, value_format='JSON') \
AS SELECT date + '/' + city AS forecast_key, * \
FROM forecasts_csv \
PARTITION BY forecast_key;
CREATE TABLE forecasts_minmax \
WITH (partitions=4, value_format='JSON') \
AS SELECT forecast_key, date, city, \
min(temperature) as min, max(temperature) as max \
FROM forecasts_keyed \
GROUP by forecast_key, date, city;
which gives me output messages like:
{"FORECAST_KEY":"2018-10-27/Melbourne","DATE":"2018-10-27","CITY":"Melbourne","MIN":21,"MAX":22}
but I can't work out how to combine this with the "latest" reading.
Upvotes: 5
Views: 2037
Reputation: 684
You need to implement a UDAF, let's call it LATEST
, that keeps the latest value of a given column and key. This is very trivial and you can find out how to add your custom UDAF in the KSQL docs: https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#udafs
Assuming that you have the LATEST
UDAF available you can write the following query:
CREATE TABLE foo AS
SELECT
date,
city,
MIN(temperature) AS minValue,
MAX(temperature) AS maxValue,
LATEST(temperature) AS latestValue
FROM forecasts_csv
GROUP BY date, city;
Upvotes: 3