SimonDahrs
SimonDahrs

Reputation: 2419

Flink stream processing count unique issues per repo

I'm working with Flink in Scala, and I'm trying to get the count of unique issues per repo. I have a datastream with tuples like this: (repo_name, issue_id, event_time). How can I obtain the count of unique issue_id's for each repo_name? I think I have to use mapWithState, but I am not sure how to use it.

Thanks in advance.

Upvotes: 0

Views: 456

Answers (1)

Elar
Elar

Reputation: 896

Lets assume you want to process the events in a tumbling time window of 7 days.

// eventStream: stream of case classes of type GithubEvent 
eventStream
   // only look at IssuesEvent
  .filter(e => e.`type` == "IssuesEvent")
   // key by the name of the repository
  .keyBy("repo.name")
   // tumbling time window of a week
  .timeWindow(Time.days(7))
   // apply window function
  .apply { (key, _, vals, out: Collector[(String)]) =>
    var count = 0;
    for (_ <- vals) {
      count = count + 1;
    }
    out.collect(s"Repo name: $key Unique issues: $count")
  }

To count the number of unique issues per repository, we need to look at IssuesEvents. We key by the name of the repository. Then, we apply a window function to return a String indicating the unique number of issues.

References:

Upvotes: 0

Related Questions