Reputation: 17026
I have a Python Spark program which I run with spark-submit
. I want to put logging statements in it.
logging.info("This is an informative message.")
logging.debug("This is a debug message.")
I want to use the same logger that Spark is using so that the log messages come out in the same format and the level is controlled by the same configuration files. How do I do this?
I've tried putting the logging
statements in the code and starting out with a logging.getLogger()
. In both cases I see Spark's log messages but not mine. I've been looking at the Python logging documentation, but haven't been able to figure it out from there.
Not sure if this is something specific to scripts submitted to Spark or just me not understanding how logging works.
Upvotes: 71
Views: 110379
Reputation: 76
You need to make spark log is reachable for driver and all executors so we have create logging class and deal it as job dependency and load it on each executors.
class Log4j:
def __init__(this, spark_session):
conf = spark_session.SparkContext.getConf()
app_id = conf.get('spark.app.id')
app_name = conf.get('spark.app.name')
log4jlogger = spark_session._jvm.org.apache.log4j
prefix_msg = '<'+app_id + ' : ' + app_name +'> '
print(prefix_msg)
self.logger = log4jlogger.logManager.getLogger(prefix_msg)
def warn(this, msg):
# log warning
self.logger.warn(msg)
def error(this, msg):
#log error
self.logger.error(msg)
def info(this, msg):
# log information message
self.logger.info(msg)
Upvotes: 0
Reputation:
You can implement the logging.Handler
interface in a class that forwards log messages to log4j under Spark. Then use logging.root.addHandler()
(and, optionally, logging.root.removeHandler()
) to install that handler.
The handler should have a method like the following:
def emit(self, record):
"""Forward a log message for log4j."""
Logger = self.spark_session._jvm.org.apache.log4j.Logger
logger = Logger.getLogger(record.name)
if record.levelno >= logging.CRITICAL:
# Fatal and critical seem about the same.
logger.fatal(record.getMessage())
elif record.levelno >= logging.ERROR:
logger.error(record.getMessage())
elif record.levelno >= logging.WARNING:
logger.warn(record.getMessage())
elif record.levelno >= logging.INFO:
logger.info(record.getMessage())
elif record.levelno >= logging.DEBUG:
logger.debug(record.getMessage())
else:
pass
Installing the handler should go immediately after you initialise your Spark session:
spark = SparkSession.builder.appName("Logging Example").getOrCreate()
handler = CustomHandler(spark_session)
# Replace the default handlers with the log4j forwarder.
root_handlers = logging.root.handlers[:]
for h in self.root_handlers:
logging.root.removeHandler(h)
logging.root.addHandler(handler)
# Now you can log stuff.
logging.debug("Installed log4j log handler.")
There's a more complete example here: https://gist.github.com/thsutton/65f0ec3cf132495ef91dc22b9bc38aec
Upvotes: 2
Reputation: 89
import logging
# Logger
logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)
Simplest way to log from pyspark !
Upvotes: 4
Reputation: 6160
You need to get the logger for spark itself, by default getLogger()
will return the logger for you own module. Try something like:
logger = logging.getLogger('py4j')
logger.info("My test info statement")
It might also be 'pyspark'
instead of 'py4j'
.
In case the function that you use in your spark program (and which does some logging) is defined in the same module as the main function it will give some serialization error.
This is explained here and an example by the same person is given here
I also tested this on spark 1.3.1
EDIT:
To change logging from STDERR
to STDOUT
you will have to remove the current StreamHandler
and add a new one.
Find the existing Stream Handler (This line can be removed when finished)
print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
There will probably only be a single one, but if not you will have to update position.
logger.removeHandler(logger.handlers[0])
Add new handler for sys.stdout
import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
Upvotes: 25
Reputation: 29645
We needed to log from the executors, not from the driver node. So we did the following:
We created a /etc/rsyslog.d/spark.conf
on all of the nodes (using a Bootstrap method with Amazon Elastic Map Reduceso that the Core nodes forwarded syslog
local1` messages to the master node.
On the Master node, we enabled the UDP and TCP syslog listeners, and we set it up so that all local
messages got logged to /var/log/local1.log
.
We created a Python logging
module Syslog logger in our map function.
Now we can log with logging.info()
. ...
One of the things we discovered is that the same partition is being processed simultaneously on multiple executors. Apparently Spark does this all the time, when it has extra resources. This handles the case when an executor is mysteriously delayed or fails.
Logging in the map
functions has taught us a lot about how Spark works.
Upvotes: 8
Reputation: 79
The key of interacting pyspark and java log4j is the jvm. This below is python code, the conf is missing the url, but this is about logging.
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
.builder\
.appName("DB2_Test")\
.config(conf = myconf) \
.getOrCreate()
Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")
Upvotes: 3
Reputation: 26201
In my case, I am just happy to get my log messages added to the workers stderr, along with the usual spark log messages.
If that suits your needs, then the trick is to redirect the particular Python logger to stderr
.
For example, the following, inspired from this answer, works fine for me:
def getlogger(name, level=logging.INFO):
import logging
import sys
logger = logging.getLogger(name)
logger.setLevel(level)
if logger.handlers:
# or else, as I found out, we keep adding handlers and duplicate messages
pass
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
Usage:
def tst_log():
logger = getlogger('my-worker')
logger.debug('a')
logger.info('b')
logger.warning('c')
logger.error('d')
logger.critical('e')
...
Output (plus a few surrounding lines for context):
17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
Upvotes: 7
Reputation: 3250
You can get the logger from the SparkContext object:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
Upvotes: 79