Reputation: 191
Part of the code I am trying to execute in Flink:
val pages = env.readCsvFile[(Long)]("/home/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1))
I want to use pages
for some other purpose but when I compile, Flink throws me an error message as
Exception in thread "main" java.lang.ClassCastException:
org.apache.flink.api.common.typeinfo.IntegerTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.PojoTypeInfo
By the way I am using the 0.9 snapshot version of Flink. Any help in the right direction is highly appreciated.
Upvotes: 2
Views: 999
Reputation: 18987
If you read from a CSV file, the return type will be a Scala tuple with all read fields. In your example, you are only reading a single field which will give a Tuple1. This is what you try to specify with the parentheses surrounding "Long":
readCsvFile[(Long)]
In Scala you can only specify Tuples with two or more fields using parentheses. So you need to write instead
readCsvFile[Tuple1[Long]]
The exception is thrown because, Flink's CSVInputFormat tries to interpret all non-Tuple types as Pojo types.
Upvotes: 3