Hemant Kumar
Hemant Kumar

Reputation: 101

Error while creating a DataStream in Apache Flink

Error while creating a DataStream using fromElements function

Below is the expeption -

Caused by: java.io.IOException: Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions. Serializer is org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:121) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745)

Upvotes: 1

Views: 1781

Answers (2)

Matthias J. Sax
Matthias J. Sax

Reputation: 62360

Why do you want to process InputStreamReader tuples? I guess there is some miss understanding here. The generic type specifies the type of the data you want to process. For example

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

Generate a finite data stream with 5 Integer tuples. I assume that you actually want to use an InputStreamReader to generate the actual tuples.

If you want to read via HttpURLConnection you could implement your own SourceFunction (or RichSourceFunction) as follows (replace OUT with you actual data type you want to use -- also consider Flinks Tuple0 to Tuple25 types):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }

        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }

    @Override
    public void cancel() {
         this.isRunning = false;
    }
});

Upvotes: 1

Till Rohrmann
Till Rohrmann

Reputation: 13356

You cannot create a DataStream<InputStreamReader> with fromElements since the InputStreamReader is not serializable. That's required by the fromElements method. Furthermore, it probably does not make so much sense to work on InputStreamReaders. I guess it would be better to simply read the data from the HttpURLConnection and then continue working on this data.

Upvotes: 0

Related Questions