Reputation: 101
Error while creating a DataStream using fromElements function
Below is the expeption -
Caused by: java.io.IOException: Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions. Serializer is org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:121) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745)
Upvotes: 1
Views: 1781
Reputation: 62360
Why do you want to process InputStreamReader
tuples? I guess there is some miss understanding here. The generic type specifies the type of the data you want to process. For example
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
Generate a finite data stream with 5 Integer
tuples. I assume that you actually want to use an InputStreamReader
to generate the actual tuples.
If you want to read via HttpURLConnection
you could implement your own SourceFunction
(or RichSourceFunction
) as follows (replace OUT
with you actual data type you want to use -- also consider Flinks Tuple0
to Tuple25
types):
env.addSource(new SourceFunction<OUT> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OUT> ctx) {
InputStreamReader isr = null;
try {
URL url = new URL("ex.in/res");
HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
if (httpconn.getResponseCode() != 200)
throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
isr = new InputStreamReader((httpconn.getInputStream()));
} catch (Exception e) {
// clean up; log error
return;
}
while(isRunning) {
OUT tuple = ... // get data from isr
ctx.collect(tuple);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
});
Upvotes: 1
Reputation: 13356
You cannot create a DataStream<InputStreamReader>
with fromElements
since the InputStreamReader
is not serializable. That's required by the fromElements
method. Furthermore, it probably does not make so much sense to work on InputStreamReaders
. I guess it would be better to simply read the data from the HttpURLConnection
and then continue working on this data.
Upvotes: 0