Flink CSV file reader fails to cast a LongType into a PojoType

Part of the code I am trying to execute in Flink:

val pages = env.readCsvFile[(Long)]("/home/ppi.csv",
   fieldDelimiter = "\t", includedFields = Array(1))

I want to use pages for some other purpose but when I compile, Flink throws me an error message as

Exception in thread "main" java.lang.ClassCastException:
org.apache.flink.api.common.typeinfo.IntegerTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.PojoTypeInfo

By the way I am using the 0.9 snapshot version of Flink. Any help in the right direction is highly appreciated.

Upvotes: 2

Views: 999

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

If you read from a CSV file, the return type will be a Scala tuple with all read fields. In your example, you are only reading a single field which will give a Tuple1. This is what you try to specify with the parentheses surrounding "Long":

readCsvFile[(Long)]

In Scala you can only specify Tuples with two or more fields using parentheses. So you need to write instead

readCsvFile[Tuple1[Long]]

The exception is thrown because, Flink's CSVInputFormat tries to interpret all non-Tuple types as Pojo types.

Upvotes: 3

Related Questions