Daniel Sali
Daniel Sali

Reputation: 13

Apache Flink fromCollection java.lang.IllegalStateException: unread block data

I'm using Scala and Flink 1.0-SNAPSHOT to perform a leftOuterJoin on a DataSet, and I get the following exception:

    11:54:15,921 INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

I use a simple Scala case class as the type of the DataSet:

case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)

I use the following method to generate the case class instances:

def getRawValuesFromZipFile(fileName: String) : Array[RawValue]

I initialise the environment and create the DataSet[RawValue] the following way:

val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print

I suspect a serialisation issue is causing the error, I'm using Scala 2.10.5 and Java 7 system libraries to compile the project. I'm using Eclipse, the project was generated by the sample project generation script.

Any help or hints on resolving the issue would be greatly appreciated :-) Thanks, Daniel

Upvotes: 1

Views: 1054

Answers (1)

aljoscha
aljoscha

Reputation: 986

The env.fromCollection() call might not really be suited for your use case. It breaks if the data becomes to big because the data is shipped with the job. Data is not read in parallel on the worker nodes.

You could look at this: https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-compressed-files and see if it works for your case. It only supports gzip, but maybe you could compress your data with that format instead.

Upvotes: 3

Related Questions