Reputation: 2419
I'm working with Flink in Scala, and I'm trying to get the count of unique issues per repo. I have a datastream with tuples like this: (repo_name, issue_id, event_time).
How can I obtain the count of unique issue_id's for each repo_name? I think I have to use mapWithState
, but I am not sure how to use it.
Thanks in advance.
Upvotes: 0
Views: 456
Reputation: 896
Lets assume you want to process the events in a tumbling time window of 7 days.
// eventStream: stream of case classes of type GithubEvent
eventStream
// only look at IssuesEvent
.filter(e => e.`type` == "IssuesEvent")
// key by the name of the repository
.keyBy("repo.name")
// tumbling time window of a week
.timeWindow(Time.days(7))
// apply window function
.apply { (key, _, vals, out: Collector[(String)]) =>
var count = 0;
for (_ <- vals) {
count = count + 1;
}
out.collect(s"Repo name: $key Unique issues: $count")
}
To count the number of unique issues per repository, we need to look at IssuesEvents. We key by the name of the repository. Then, we apply a window function to return a String indicating the unique number of issues.
References:
Upvotes: 0