Reputation: 2957
I'm trying to run an integration test of my flink application. My test code looks like this:
public class HttpsCsvIngestorTest extends AbstractTestBase {
private final static Logger LOG = LoggerFactory.getLogger(HttpsCsvIngestorTest.class);
@Test
public void testHttpsCsvIngestion() throws Exception {
Thread flinkJob = new Thread(new Runnable() {
@Override
public void run() {
String[] args = new String[] { "--configFile", "the/path/to/config.properties", "--secretKey",
"12345" };
JobExecutionResult execResult = CsvProcessorFlinkDriver.runFlinkJob(args);
}
});
flinkJob.start();
LOG.info("Starting flink job");
Thread.sleep(10000);
String[] args2 = new String[] { "localhost", filename };
FileUploadClient.main(args2);
Thread.sleep(30000);
assertTrue(new File(System.getProperty("user.dir") + File.separator + "C:/Desktop/Result.csv")
.exists());
System.out.println("Test completed. Going to shutdown flink job");
}
}
Here I'm starting my flink application from a child thread, and uploading a file for processing from the main thread. The test runs fine, and I get the expected result file. However, I get the following error at the end, when the application is being shut down:
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477 - Error during disposal of stream operator.
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) Task Task.java:843 - FATAL - exception in resource cleanup of task Source: JettyServerFileSource -> Map -> Process -> Sink: Unnamed (1/1) (12d3e0627e62ad44c57c45b720682e56).
java.lang.IllegalStateException: Memory manager has been shut down.
at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:470)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:824)
at java.lang.Thread.run(Thread.java:745)
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:566)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:540)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.executeJobBlocking(FlinkMiniCluster.scala:714)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at mycode.CsvProcessorFlinkDriver.flinkJettyExecution(CsvProcessorFlinkDriver.java:132)
at mycode.CsvProcessorFlinkDriver.runFlinkJob(CsvProcessorFlinkDriver.java:56)
at com.demo.code.HttpsCsvIngestorTest$1.run(HttpsCsvIngestorTest.java:30)
at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/$a#-1711434410]] after [21474835000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(LightArrayRevolverScheduler.scala:338)
at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:142)
at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:141)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at akka.actor.LightArrayRevolverScheduler.close(LightArrayRevolverScheduler.scala:140)
at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:892)
at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:826)
at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:842)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Here CsvProcessorFlinkDriver.java:132 is the executionResult = env.execute line.
Is there something I'm doing wrong? I also notice that if I start the flink application in the main thread of the test class(instead of from a child thread), execution does not progress to the lines starting from LOG.info("Starting flink job");
My flink version is 1.5.0. The test gives the same error even with flink-1.6.0.
Upvotes: 0
Views: 1351
Reputation: 2040
When a flink job ends, flink unloads classes used by your code.
If something tries to use those classes after flink unloads them you usually end up with ClassNotFound
exceptions. So you have to make sure no code touches those classes.
In your case you get a NoSuchMethodError
on the class org.apache.commons.io.IOUtils
. It seems like you use a different version of commons-io and JettyServerFileSource tries to AutoClose some resource.
See if you have a dependency on commons-io
that is different than version 2.4 (that's the version used by flink)
Upvotes: 1