Ravi Teja
Ravi Teja

Reputation: 377

Spark Streaming Driver and App work files cleanup

I am running spark 2.0.2 and deployed streaming job in cluster deploy-mode on a spark standalone cluster. The streaming job works fine but there is an issue with the application's and driver's stderr files that are created in the work directory of SPARK_HOME. As the streaming is always running, these files only grow in size and I have no clue how to control it.

I have tried the following solutions even though they are not exactly related to the problem in hand but I still tried and didn't work:

  1. Apache Spark does not delete temporary directories
  2. How to log using log4j to local file system inside a Spark application that runs on YARN?

Can anyone please help me how to limit the size of these files being created?

P.S: I have tried the solution of adding the below line in conf/spark-env.sh and restarting the cluster but it didn't work in case of running streaming application.

export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=60 -Dspark.worker.cleanup.appDataTtl=60"

EDIT:

@YuvalItzchakov I have tried your suggestion but it didn't work. The driver's stderr log is as below:

Launch Command: "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" "-cp" "/mnt/spark2.0.2/conf/:/mnt/spark2.0.2/jars/*" "-Xmx2048M" "-Dspark.eventLog.enabled=true" "-Dspark.eventLog.dir=/mnt/spark2.0.2/JobsLogs" "-Dspark.executor.memory=2g" "-Dspark.deploy.defaultCores=2" "-Dspark.io.compression.codec=snappy" "-Dspark.submit.deployMode=cluster" "-Dspark.shuffle.consolidateFiles=true" "-Dspark.shuffle.compress=true" "-Dspark.app.name=Streamingjob" "-Dspark.kryoserializer.buffer.max=128M" "-Dspark.master=spark://172.16.0.27:7077" "-Dspark.shuffle.spill.compress=true" "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" "-Dspark.cassandra.input.fetch.size_in_rows=20000" "-Dspark.executor.extraJavaOptions=-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "-Dspark.jars=file:/mnt/spark2.0.2/sparkjars/StreamingJob-assembly-0.1.0.jar" "-Dspark.executor.instances=10" "-Dspark.driver.extraJavaOptions=-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "-Dspark.driver.memory=2g" "-Dspark.rpc.askTimeout=10" "-Dspark.eventLog.compress=true" "-Dspark.executor.cores=1" "-Dspark.driver.supervise=true" "-Dspark.history.fs.logDirectory=/mnt/spark2.0.2/JobsLogs" "-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "org.apache.spark.deploy.worker.DriverWrapper" "spark://[email protected]:34475" "/mnt/spark2.0.2/work/driver-20170210124424-0001/StreamingJob-assembly-0.1.0.jar" "Streamingjob"
========================================

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/02/10 12:44:26 INFO SecurityManager: Changing view acls to: cassuser
17/02/10 12:44:26 INFO SecurityManager: Changing modify acls to: cassuser
17/02/10 12:44:26 INFO SecurityManager: Changing view acls groups to: 
17/02/10 12:44:26 INFO SecurityManager: Changing modify acls groups to: 

And my log4j.xml file looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
    <appender name="stdout" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="TRACE"/>
        <param name="File" value="stdout"/>
        <param name="maxFileSize" value="1MB"/>
        <param name="maxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
        </layout>
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="ALL" />
            <param name="levelMax" value="OFF" />
        </filter>
    </appender>

    <appender name="stderr" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="WARN"/>
        <param name="File" value="stderr"/>
        <param name="maxFileSize" value="1MB"/>
        <param name="maxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
        </layout>
    </appender>
</log4j:configuration>

Note that I have removed this root tag from your xml in the answer as it gives some error:

<root>
    <appender-ref ref="console"/>
</root>

Upvotes: 0

Views: 1066

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149618

You can use a custom log4j xml file for that.

First, declare your XML file:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
    <appender name="stdout" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="TRACE"/>
        <param name="File" value="stdout"/>
        <param name="maxFileSize" value="50MB"/>
        <param name="maxBackupIndex" value="100"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
        </layout>
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="ALL" />
            <param name="levelMax" value="OFF" />
        </filter>
    </appender>

    <appender name="stderr" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="WARN"/>
        <param name="File" value="stderr"/>
        <param name="maxFileSize" value="50MB"/>
        <param name="maxBackupIndex" value="100"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
        </layout>
    </appender>
    <root>
        <appender-ref ref="console"/>
    </root>
</log4j:configuration>

Then, when you run your streaming job, you need to pass the log4j.xml file to Spark master and workers via extraJavaOptions:

spark-submit \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///path/to/log4j.xml \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///path/to/log4j.xml

Note that the path on the master and worker node maybe different, depending on how you deploy your JAR and files to Spark. You said you're using cluster mode so I assume you're manually dispatching the JAR and extra files, but for anyone running this in client mode, you'll need to also add the xml files via the --files flag.

Upvotes: 0

Related Questions