Reputation: 1383
I am currently working on an Flink application that uses some of the Hadoop dependencies to write the data to S3 location. On local environment it is working fine, however when I deploy this Flink application on EMR cluster it throws an exception related to compatibility issue.
The error message that I am getting is
java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency.
at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778) ....
I have included the maven dependency of flink-hadoop-compatibility-2.10
jar in POM dependency. But it is not detecting it. The Flink version I am using is 1.2.0
However, when I explicitly copy the compatibility JAR to the ${FLINK-HOME}/lib
location, I am not getting any exception and able to run the Flink application successfully.
Is there any way that we can use, so that without deploying the JAR file to ${FLINK-HOME}/lib
we can run the application?
OR
What modifications required in POM dependencies, so that the application will detect it and it is not required to copy the compatibility JAR to flink-home/lib location?
Upvotes: 3
Views: 926
Reputation: 1334
package org.apache.flink.api.java.typeutils;
public class TypeExtractor {
/** The name of the class representing Hadoop's writable */
private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable";
private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
// visible for testing
public static <T> TypeInformation<T> createHadoopWritableTypeInfo(Class<T> clazz) {
checkNotNull(clazz);
Class<?> typeInfoClass;
try {
typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load the TypeInformation for the class '"
+ HADOOP_WRITABLE_CLASS + "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
}
...
}
}
This is because org.apache.hadoop.io.Writable
is mean to be loaded by TypeExtractor.class.getClassLoader() which is AppClassLoader
, and the submited flink jar is loaded by ParentFirstClassLoader
, which is the child of AppClassLoader
, so AppClassLoader
can not load org.apache.hadoop.io.Writable
from your flink jar.
I'm not sure if it's a bug, change to classLoader to Thread.currentThread().getContextClassLoader()
will make it work without copy the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.
Upvotes: 2
Reputation: 1383
After looking into various posts and experimenting with POM files, I think with current version of Apache Flink (1.2.0)
it is required to copy (deploy) the JAR file to ${FLINK-HOME}/lib
location.
Upvotes: 1