Reputation: 459
i read a csv file (http://data.gdeltproject.org/events/index.html) from disk with flink (java, maven version 8.1) and get the following exception:
ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
my code:
public static void main(String[] args) {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//env.setDegreeOfParallelism(1);
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
points.print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
// load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
String inputFile = pro.getProperty("input");
// map csv file
return env.readCsvFile(inputFile)
.ignoreInvalidLines()
.fieldDelimiter('\u0009')
.lineDelimiter("\n")
.includeFields(true, true, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, true, true
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false)
.types(String.class, Long.class, Double.class, Double.class)
.map(new TuplePointConverter());
}
maybe someone have a solution?
best regards paul
Upvotes: 1
Views: 279
Reputation: 2371
I am posting the answer from the Apache Flink mailing list here, so people do not have to read through the mailing list archive:
The reason for the error was that custom serialization logic was used, and the deserialization function was erroneous and did not consume all data.
The latest master has an improved error message for that.
As Background:
Flink supports two kind of type interfaces that allow programmers to implement their own serialization routines: Writables (Hadoop's core type interface) and Values (Flink's own custom serialization interface).
Upvotes: 4