Reputation: 149608
Running Flink 1.9.0 with Scala 2.12 and attempting to publish data to Kafka using the flink-connector-kafka
, everything works fine when debugging locally. Once I submit the job to the cluster, I get the following java.lang.LinkageError
at runtime which fails to run the job:
java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/util/ChildFirstClassLoader) previously initiated loading for a different type with name "org/apache/kafka/clients/producer/ProducerRecord"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:561)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1202)
at sun.reflect.GeneratedMethodAccessor358.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:370)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
When looking at the loaded classes with -verbose:class
, I see the class is being loaded several times:
taskmanager [Loaded org.apache.kafka.clients.producer.ProducerRecord from file:/tmp/blobStore-8cf95113-e767-4073-9b1b-e579d46c0283/job_f0c3db8b84dd38e83f92ecf1bc61b698/blob_p-c327eb8f4333a638b2b7049049368f23254aeb9c-03045e6d6a9c8f3c7dacdded8cb97d6e]
taskmanager [Loaded org.apache.kafka.clients.producer.ProducerRecord from file:/tmp/blobStore-8cf95113-e767-4073-9b1b-e579d46c0283/job_f0c3db8b84dd38e83f92ecf1bc61b698/blob_p-c327eb8f4333a638b2b7049049368f23254aeb9c-03045e6d6a9c8f3c7dacdded8cb97d6e]
Where the class is being loaded from the same Uber-JAR that I submit to Flink. Moreover, there are no multiple transitive dependencies loading ProducerRecord
, my JAR is the only supplier of that dependency.
build.sbt
:
lazy val flinkVersion = "1.9.0"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-table-planner" % flinkVersion,
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion,
"org.apache.flink" %% "flink-container" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-json" % flinkVersion % "provided",
"org.apache.flink" % "flink-avro" % flinkVersion % "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
"org.apache.flink" %% "flink-runtime-web" % flinkVersion % "provided",
"org.apache.flink" %% "flink-cep" % flinkVersion
)
Upvotes: 2
Views: 2000
Reputation: 149608
For an unknown reason, setting the classloader.resolve-order
property to parent-first
as mentioned in the Apache Flink mailing list resolves the issue. I am still baffled as to why it works, as there should be no dependency clashes between the child and parent classloader loading different versions of this dependency (as it is not provided out of the box with the flink-dist
I am using).
In the Flink documentation under "Debugging Classloading", there's a section which talks about this parent-child relationship:
In setups where dynamic classloading is involved (plugin components, Flink jobs in session setups), there is a hierarchy of typically two ClassLoaders: (1) Java’s application classloader, which has all classes in the classpath, and (2) the dynamic plugin/user code classloader. for loading classes from the plugin or the user-code jar(s). The dynamic ClassLoader has the application classloader as its parent.
By default, Flink inverts classloading order, meaning it looks into the dynamic classloader first, and only looks into the parent (application classloader) if the class is not part of the dynamically loaded code.
The benefit of inverted classloading is that plugins and jobs can use different library versions than Flink’s core itself, which is very useful when the different versions of the libraries are not compatible. The mechanism helps to avoid the common dependency conflict errors like IllegalAccessError or NoSuchMethodError. Different parts of the code simply have separate copies of the classes (Flink’s core or one of its dependencies can use a different copy than the user code or plugin code). In most cases, this works well and no additional configuration from the user is needed.
I have yet to understand why loading ProducerRecord
happens more than once, or what this "different type" in the exception message refers to (greping on the result of -verbose:class
yielded only a single path for ProducerRecord
).
Upvotes: 5