Reputation: 1
I am a beginner on Flink streaming.
When reading a file with RowCsvInputFormat, the code that Kryo serializer creates Row does not work properly.
The code is below.
val readLocalCsvFile = new RowCsvInputFormat(
new Path("flink-test/000000_1"),
Array(Types.STRING, Types.STRING, Types.STRING),
"\n",
","
)
val read = env.readFile(
readLocalCsvFile,
"flink-test/000000_1",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000000)
read.print()
env.execute("test")
The contents of the file 000000_1 are as follows.
aa,bb,cc
aaa,bbb,ccc
As a result of debugging, I get the divided values of aa, bb, and cc well. But when I put those values into Row's fields one by one, a nullpointexception is raised because fields are null.
The image below shows that the fields of the Row are null.
The code that creates a Row when the above code is executed is as follows. KryoSerializer generates the row.
val kryo = new EmptyFlinkScalaKryoInstantiator().newKryo
val Row = kryo.newInstance(classOf[Row])
The output error is as follows.
java.lang.NullPointerException
at org.apache.flink.types.Row.setField(Row.java:140)
at org.apache.flink.api.java.io.RowCsvInputFormat.fillRecord(RowCsvInputFormat.java:162)
at org.apache.flink.api.java.io.RowCsvInputFormat.fillRecord(RowCsvInputFormat.java:33)
at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:113)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:551)
at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:80)
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:387)
at
Upvotes: 0
Views: 374
Reputation: 823
Maybe you can post the complete code.
Judging from the task error report, it may be because the number of fields does not match
Upvotes: 0