Reputation: 163
I have tried using the following in ~/.config/dask/distributed.yaml
and ~/.config/dask/yarn.yaml
,
logging-file-config: "/path/to/config.ini"
or
logging:
version: 1
disable_existing_loggers: false
root:
level: INFO
handlers: [consoleHandler]
handlers:
consoleHandler:
class: logging.StreamHandler
level: INFO
formatter: sample_formatter
stream: ext://sys.stderr
formatters:
sample_formatter:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
and then in my function that gets evaluated at the worker:
import logging
from distributed.worker import logger
import dask
from dask.distributed import Client
from dask_yarn import YarnCluster
log = logging.getLogger(__name__)
@dask.delayed
def worker_func(args):
logger.info("This will show up in the worker logs")
log.info("This does not show up in worker logs")
return
if __name__ == "__main__":
dag_1 = {'worker_func': (worker_func, arg_1)}
tasks = dask.get(dag_1, 'load-1')
log.info("This also shows up in logs, and custom formatted)
cluster = YarnCluster()
client = Client(cluster)
dask.compute(tasks)
When I try to view the yarn logs using:
yarn logs -applicationId {application_id}
I do not see the log from log.info
inside worker_func
, but I do see the logs from distributed.worker.logger
and from outside that function on the console. I also tried using client.get_worker_logs(), but that returned an empty dictionary. Is there a way to see customized logs from inside the function that gets evaluated at a worker?
Upvotes: 1
Views: 1265
Reputation: 2445
There's a lot going on in this question, so I'm going to answer "How do I configure logging for dask-yarn workers" and hope everything else becomes clear by answering that.
Dask's configuration system is loaded locally on the machine you start a dask cluster from (usually the edge node). This configuration is not distributed to the workers automatically, you're responsible for doing that yourself. You have a few options here:
/etc/dask/
on every node, which will affect all users.{prefix}/etc/dask/
, where prefix
is sys.prefix
.For example, if you have a conda environment at /path/to/environment
, you'd do the following to bundle the configuration
# Create the configuration directory in the environment
mkdir -p /path/to/environment/etc/dask/
# Add your configuration to this directory
mv config.yaml /path/to/environment/etc/dask/config.yaml
# Package the environment
conda pack -p /path/to/environment -o environment.tar.gz
Any configuration values set in config.yaml
will now be available on all the worker nodes. An example configuration file setting some logging configuration would be:
logging:
version: 1
root:
level: INFO
handlers: [consoleHandler]
handlers:
consoleHandler:
class: logging.StreamHandler
level: INFO
formatter: sample_formatter
stream: ext://sys.stderr
formatters:
sample_formatter:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
Logs from completed dask-yarn applications can be retrieved using the YARN cli at
yarn logs -applicationId <application-id>
Logs for running dask-yarn applications can be retrieved using client.get_worker_logs()
. Note that these logs will only contain logs written to the distributed.worker
logger. You cannot write to your own logger and have them appear in the output of client.get_worker_logs()
. To write to this logger, get it via
import logging
logger = logging.getLogger("distributed.worker")
logger.info("Writing with the worker logger")
Any logger appropriately configured to log to stdout
or stderr
will appear in the logs accessed via the yarn CLI, but only the distributed.worker
logger output will also be available to get_worker_logs()
.
Side note
I have tried using the following in ~/.config/dask/distributed.yaml and ~/.config/dask/yarn.yaml
The name of the config files doesn't matter, dask loads all yaml
files in all config directories and merges their contents. For more information please read the configuration docs
Upvotes: 2