Reputation: 3531
This is a non working try for using Flink fold with scala anonymous function:
val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])
It compiles well, but at execution, I get a "type erasure issue" (see below). Doing so in Java is fine, but of course more verbose. I like the concise and clear lambdas. How can I do that in scala?
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined.
This is most likely a type erasure problem.
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
Upvotes: 5
Views: 637
Reputation: 13346
The problem you encountered is a bug in Flink [1]. The problem originates from Flink's TypeExtractor
and the way the Scala DataStream API is implemented on top of the Java implementation. The TypeExtractor
cannot generate a TypeInformation
for the Scala type and thus returns a MissingTypeInformation
. This missing type information is manually set after creating the StreamFold
operator. However, the StreamFold
operator is implemented in a way that it does not accept a MissingTypeInformation
and, consequently, fails before setting the right type information.
I've opened a pull request [2] to fix this problem. It should be merged within the next two days. By using then the latest 0.10 snapshot version, your problem should be fixed.
Upvotes: 3