Reputation: 13
I'm using Scala and Flink 1.0-SNAPSHOT to perform a leftOuterJoin on a DataSet, and I get the following exception:
11:54:15,921 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
I use a simple Scala case class as the type of the DataSet:
case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)
I use the following method to generate the case class instances:
def getRawValuesFromZipFile(fileName: String) : Array[RawValue]
I initialise the environment and create the DataSet[RawValue] the following way:
val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print
I suspect a serialisation issue is causing the error, I'm using Scala 2.10.5 and Java 7 system libraries to compile the project. I'm using Eclipse, the project was generated by the sample project generation script.
Any help or hints on resolving the issue would be greatly appreciated :-) Thanks, Daniel
Upvotes: 1
Views: 1054
Reputation: 986
The env.fromCollection()
call might not really be suited for your use case. It breaks if the data becomes to big because the data is shipped with the job. Data is not read in parallel on the worker nodes.
You could look at this: https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-compressed-files and see if it works for your case. It only supports gzip, but maybe you could compress your data with that format instead.
Upvotes: 3