Jim Wen
Jim Wen

Reputation: 11

Flink BlockElement Exception when updating to version 1.14.2

Before, everything works well with flink 1.13.1, lately we update it to flink 1.14.2, the following code is run: and it throws this exception:

<T> DataStream<Tuple3<String, String, T>> returnsInternal(SiddhiOperatorContext siddhiContext, String[] executionPlanIds) {
    if (createdDataStream == null) {
        DataStream<Tuple2<StreamRoute, Object>> mapped = this.dataStream.map(new MapFunction<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>>() {
            @Override
            public Tuple2<StreamRoute, Object> map(Tuple2<StreamRoute, Object> value) throws Exception {
                if (executionPlanIds != null && executionPlanIds.length != 0) {
                    for (String executionPlanId : executionPlanIds) {
                        if (!executionPlanId.isEmpty()
                                && siddhiContext.getExecutionPlan(executionPlanId).IsUsedStream(value.f0.getInputStreamId())) {
                            value.f0.addExecutionPlanId(executionPlanId);
                        }
                    }
                }
                return value;
            }
        });
        
        createdDataStream = SiddhiStreamFactory.createDataStream(siddhiContext, mapped);
    }

    return createdDataStream;
}

The exception and callstack are as follows:

org.apache.flink.api.common.InvalidProgramException: The implementation of the BlockElement is not serializable. The object probably contains or references non serializable fields.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2139) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:577) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStreamBase.returnsInternal(ExecutionSiddhiStreamBase.java:135) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStreamBase.returnsInternal(ExecutionSiddhiStreamBase.java:123) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStream.returnAsRow(ExecutionSiddhiStream.java:180) at org.apache.flink.streaming.siddhi.ExecutionSiddhiStream.ExecutionSiddhiStream.returnAsRowWithQueryId(ExecutionSiddhiStream.java:165) at org.apache.flink.streaming.siddhi.SiddhiCEPITCase.testSimplePojoStreamAndReturnRowWithQueryId(SiddhiCEPITCase.java:245) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.io.NotSerializableException: org.apache.flink.configuration.description.TextElement at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 45 more

So, why is there a problem and what's the difference between 1.13.1&1.14.0,how can we fix this problem?

Upvotes: 1

Views: 616

Answers (2)

Chesnay Schepler
Chesnay Schepler

Reputation: 1280

Note that plain Java serialization still works for the ExecutionConfig, it is just the ClosureCleaner that rejects it because it does very strict checks w.r.t. serializability.

As such, the underlying problem could be that the closure of your map function is unnecessarily large. The SiddhiOperatorContext that you pass into the method will become part of the map functiosn closure, so you could check whether you can minimize the size of that context such that it no longer relies on an ExecutionConfig.

Upvotes: 0

Jim Wen
Jim Wen

Reputation: 11

Thank you,David Anderson. This should be a bug introduced by the latest flink commit of file flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

Diff the file, we can find TextElement is used here where ClosureCleanerLevel is is used as a memeber of Serializable ExecutionConfig.

TextElement in ClosureCleanerLevel

In Flink Siddhi, ExecutionConfig is serialized which is used to serilize flink data to siddhi type on every taskmanager, so that should be the cause.

The simplest way to verify the problem is running code as followings in flink 1.13.5 and 1.14.0, the exception is reproduced in 1.14.0 . And the diff between 1.13.5 and 1.14.0 is only lates commit.

@Test
public void testExecutionConfigSerializable() throws Exception {
    ExecutionConfig config = new ExecutionConfig();
    ClosureCleaner.clean(config, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

Upvotes: 0

Related Questions