Reputation: 77
the code is below:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String](),(a:(Int,String),b:mutable.HashSet[String])=>a)
error occurs during compilation,the error message is:
Error: missing arguments for method fold in class WindowedStream; follow this method with `_' if you want to treat it as a partially applied function timeWindow(Time.seconds(30), Time.seconds(1)).fold(mutable.HashSetString,
but the function defined in class WindowedStream is:
public fold(R initialValue, FoldFunction function)
Upvotes: 1
Views: 223
Reputation: 13346
The problem is twofold: First of all, the fold
function expects the FoldFunction
to be passed in a second parameter list if you're using Scala. Secondly, the first parameter of the FoldFunction
should be of the aggregating type. Thus, in your case it should be of type mutable.HashSet[String]
. The following snippet should do the trick:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String]()){
(a: mutable HashSet[String], b: (Int, String)) => a
}
Be aware that the Flink's fold
API call is deprecated. It's now recommend to use the aggregate
API call.
Upvotes: 3