Reputation: 11
I am trying to submit a job to my flink cluster, but I keep running into the error below:
2021-05-03 17:14:32
java.lang.NoSuchMethodError: org/apache/flink/api/common/state/OperatorStateStore.getSerializableListState(Ljava/lang/String;)Lorg/apache/flink/api/common/state/ListState; (loaded from file:/opt/flink/lib/flink-dist_2.11-1.11.3.jar by jdk.internal.loader.ClassLoaders$AppClassLoader@c7a20636) called from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase (loaded from file:/tmp/blobStore-d962f26c-fc16-4ff4-89da-4d86ed60c35e/job_fdc8c054e20b751b6a6f549af602c3d2/blob_p-5fcb7b854786da736df1bbd47aa02017c714f655-34f669043602e72ef3faf32247ab2b17 by org.apache.flink.util.ChildFirstClassLoader@3952d030).
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:858)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$274/0x0000000014046b10.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:836)
I believe the issue has something to do with a version mismatch (I assume with the kafka connector I am using), but I haven't had any luck resolving it. I've tried setting my flink version to be the same as my cluster (1.11.3), but the error persists. I'm not entirely sure how to use the maven-shade-plugin to resolve this either. Any help would be appreciated!
Upvotes: 1
Views: 2661
Reputation: 43499
In the release notes for Flink 1.11 it states that
Removal of deprecated state access methods (FLINK-17376)
We removed deprecated state access methods
RuntimeContext#getFoldingState()
,OperatorStateStore#getSerializableListState()
andOperatorStateStore#getOperatorState()
. This means that some code that was compiled against Flink 1.10 will not work with a Flink 1.11 cluster. An example of this is our Kafka connector which internally usedOperatorStateStore.getSerializableListState
.
You should always expect to recompile your user jars when upgrading to new versions of Flink. Binary compatibility across minor version updates (e.g., from 1.10.x to 1.11.y) is not guaranteed. (It's unusual for breaking changes to be introduced in patch releases, but it has happened once or twice.)
Recap: everything involved -- your user jar, its dependencies, and the cluster should all be using the same version of Flink.
Upvotes: 1