Pa Rö
Pa Rö

Reputation: 459

Apache Flink Channel received an event before completing the current partial record

i read a csv file (http://data.gdeltproject.org/events/index.html) from disk with flink (java, maven version 8.1) and get the following exception:

ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.:  DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
    at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
    at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
    at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

my code:

public static void main(String[] args) {
    // set up execution environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //env.setDegreeOfParallelism(1);
    // get input points
    DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
    points.print();
    // execute program
    try {
        env.execute("KMeans Flink");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            .lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }

maybe someone have a solution?

best regards paul

Upvotes: 1

Views: 279

Answers (1)

Stephan Ewen
Stephan Ewen

Reputation: 2371

I am posting the answer from the Apache Flink mailing list here, so people do not have to read through the mailing list archive:

The reason for the error was that custom serialization logic was used, and the deserialization function was erroneous and did not consume all data.

The latest master has an improved error message for that.

As Background:

Flink supports two kind of type interfaces that allow programmers to implement their own serialization routines: Writables (Hadoop's core type interface) and Values (Flink's own custom serialization interface).

Upvotes: 4

Related Questions