David Moravek
David Moravek

Reputation: 201

Custom log4j appender in spark executor

I'm trying to use custom log4j appender inside spark executor, in order to forward all logs to Apache Kafka.

The problem is, log4j is initialized before fatjar's classloader with appender gets registered, so I end up with following:

log4j:ERROR Could not instantiate class [kafka.producer.KafkaLog4jAppender].
java.lang.ClassNotFoundException: kafka.producer.KafkaLog4jAppender
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:260)
    at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
    at org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
    at org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
    at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
    at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
    at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
    at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
    at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
    at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
    at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
    at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
    at org.apache.spark.Logging$class.log(Logging.scala:51)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.log(CoarseGrainedExecutorBackend.scala:126)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:137)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:235)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
log4j:ERROR Could not instantiate appender named "KAFKA".
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Starting remoting
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:36918]
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:36918]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Starting remoting
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:40067]
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:40067]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-5] INFO Remoting: Remoting shut down
....

The problem seems to be right here: https://github.com/apache/spark/blob/v1.3.1/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L126

Is there any easy way to solve this? We are currently using Spark 1.3.x.

Thanks

David

Upvotes: 9

Views: 5292

Answers (3)

Benak Raj
Benak Raj

Reputation: 328

Was facing the same issue , I will post what worked for me, it turns out the KafkaLog4jAppenderclass package name changed in kafka 0.9, here is what I did, added following dependency in pom

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-log4j-appender</artifactId>
        <version>0.9.0.0</version>
    </dependency>

and changed my log4j.properties from

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender

to

log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender 

Upvotes: 1

David Moravek
David Moravek

Reputation: 201

Ended up submitting extra jar with logging deps and loading it before user classpath.

LOG_JAR="${THISDIR}/../lib/logging.jar"
spark-submit ...... \
  --files "${LOG4J_CONF},${LOG_JAR}" \
  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
  --conf "spark.driver.extraClassPath=`basename ${LOG_JAR}`" \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
  --conf "spark.executor.extraClassPath=`basename ${LOG_JAR}`" \
  ...

https://issues.apache.org/jira/browse/SPARK-10881?filter=-2

Upvotes: 5

xiezefan
xiezefan

Reputation: 631

kafka.producer.KafkaLog4jAppender is in kafka's hadoop-producer.

so you can add this dependency to fix it.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>hadoop-producer</artifactId>
    <version>0.8.0</version>
</dependency>

Upvotes: 0

Related Questions