Reputation: 2227
As per flink documentation to avoid dynamic class loading, following can be done. When running a setup where the Flink JobManager and TaskManagers are exclusive to one particular job, one can put JAR files directly into the /lib folder to make sure they are part of the classpath and not loaded dynamic class loading.
However when the jars are added to /lib folder receiving following exception. Is there any workaround, for this error.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) ~[iot-mirror-device.jar:na]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) ~[iot-mirror-device.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) ~[iot-mirror-device.jar:na]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ~[iot-mirror-device.jar:na]
... 4 common frames omitted
Timestamp=2018-03-26 13:46:42,433 LogLevel=INFO ThreadId=[flink-akka.actor.default-dispatcher-6] Class=o.a.f.r.e.ExecutionGraph Msg=Source: Custom Source -> Sink: Unnamed (1/1) (3f12f6953a235eb43f07cdf7966b5fcf) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) ~[iot-mirror-device.jar:na]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) ~[iot-mirror-device.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) ~[iot-mirror-device.jar:na]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) ~[iot-mirror-device.jar:na]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ~[iot-mirror-device.jar:na]
... 4 common frames omitted
Upvotes: 3
Views: 2915
Reputation: 270
I also had the same issue. Please check the version of jars added in the flink's lib folder the version should be exactly same as used in your job jar's pom xml. i.e flink-table_2.11-1.7.2.jar and flink-json-1.7.2.jar in my case.
Upvotes: 1
Reputation: 97
LinkedMap class is being loaded from two different packages, and those are being assigned to each other. This is an error of type "X cannot be cast to X exceptions".
The common reason is that library is not compatible with Flink's inverted classloading approach. You can resolve this by adding the following configuration in conf/flink-conf.yaml and restart flink.
classloader.resolve-order: parent-first
This should solve the issue.
Upvotes: 3
Reputation: 2913
At the first glance it looks like you added appache-commons-collections version 4.x to your libs dir. LinkedMap became a generic in version 4. That might cause your problems because flink depends on an older commons-collections version. This line will pretty likely cause the trouble:
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
Find it at this file on flink's github repo
A simple approach would be to remove commons-collections 4.x from the libs dir. If you are lucky your dependencies can also deal with the provided version. Otherwise you are in trouble ;-). Than you made shade commons-collection.
Hope this helps.
Upvotes: 0