Reputation: 560
I have a simple stream if data which is of this form:
id | name | eventType | eventTime
----------------------------------
1 A PLAY (ts of when the client fired the event)
1 B IMPRESSION
2 A CLICK
The end goal is to calculate the sum of event of the eventType CLICK divided by the sum of eventType of the type IMPRESSION grouped by ID and NAME for a tumbling window of 60 seconds.
in pure SQL it would look like
SELECT d.id, d.name, d.impressionCount, d.clickCount, d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
(
SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK'
GROUP BY i.id, i.name
) as d
So I first need to create a column with the number clicks and a new column with the number of impression and then i use that table to do a division.
My question is.. what is the best to do this with Flink APis ? I have attempted to do this:
Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");
and same for the impression and then I join this two table. But I do not get the right result and I'm not sure this is the best way to achieve what I want to do using tubling window.
EDIT:
This is how I transform the stream into tables:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
@Override
public long extractAscendingTimestamp(EventWithCount element) {
try {
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Date parsedDate = df1.parse(element.eventTime);
Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
return timestamp.getTime();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
Upvotes: 1
Views: 520
Reputation: 18987
Your Table API query to count the CLICK
events by id
and name
per minute looks good.
Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as clickId, eventType.count as clickCount, minuteWindow.rowtime as clickMin");
Do the same for IMPRESSION
:
Table impressionCountTable = eventsTable
.where("eventType = 'IMPRESSION'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as impId, eventType.count as impCount, minuteWindow.rowtime as impMin");
Finally, you have to join both tables:
Table result = impressionCountTable
.leftOuterJoin(clickCountTable, "impId = countId && impMin = countMin")
.select("impId as id, impMin as minute, clickCount / impCount as ratio")
Note the join condition impMin = countMin
. This will turn the join into a time-windowed join with a minimal window size of 1 millisecond (ms is the granularity of time in Flink SQL).
You said, that the query did not behave as you expected. Can you be more specific about your expected and actual result?
Upvotes: 1