Saurabh
Saurabh

Reputation: 163

How to capture logs from workers from a Dask-Yarn job?

I have tried using the following in ~/.config/dask/distributed.yaml and ~/.config/dask/yarn.yaml,

logging-file-config: "/path/to/config.ini"

or

logging:
  version: 1
  disable_existing_loggers: false

  root:
    level: INFO
    handlers: [consoleHandler]

  handlers:
    consoleHandler:
      class: logging.StreamHandler
      level: INFO
      formatter: sample_formatter
      stream: ext://sys.stderr

  formatters:
    sample_formatter:
      format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

and then in my function that gets evaluated at the worker:

import logging
from distributed.worker import logger
import dask
from dask.distributed import Client
from dask_yarn import YarnCluster

log = logging.getLogger(__name__)

@dask.delayed
def worker_func(args):
    logger.info("This will show up in the worker logs")
    log.info("This does not show up in worker logs")
    return

if __name__ == "__main__":
    dag_1 = {'worker_func': (worker_func, arg_1)}
    tasks = dask.get(dag_1, 'load-1')

    log.info("This also shows up in logs, and custom formatted)
    cluster = YarnCluster()
    client = Client(cluster)
    dask.compute(tasks)

When I try to view the yarn logs using:

yarn logs -applicationId {application_id}

I do not see the log from log.info inside worker_func, but I do see the logs from distributed.worker.logger and from outside that function on the console. I also tried using client.get_worker_logs(), but that returned an empty dictionary. Is there a way to see customized logs from inside the function that gets evaluated at a worker?

Upvotes: 1

Views: 1265

Answers (1)

jiminy_crist
jiminy_crist

Reputation: 2445

There's a lot going on in this question, so I'm going to answer "How do I configure logging for dask-yarn workers" and hope everything else becomes clear by answering that.

Dask's configuration system is loaded locally on the machine you start a dask cluster from (usually the edge node). This configuration is not distributed to the workers automatically, you're responsible for doing that yourself. You have a few options here:

  • Have admin/IT put configuration in /etc/dask/ on every node, which will affect all users.
  • Bundle configuration with your packaged environment. Dask will load configuration from {prefix}/etc/dask/, where prefix is sys.prefix.

For example, if you have a conda environment at /path/to/environment, you'd do the following to bundle the configuration

# Create the configuration directory in the environment
mkdir -p /path/to/environment/etc/dask/
# Add your configuration to this directory
mv config.yaml /path/to/environment/etc/dask/config.yaml
# Package the environment
conda pack -p /path/to/environment -o environment.tar.gz

Any configuration values set in config.yaml will now be available on all the worker nodes. An example configuration file setting some logging configuration would be:

logging:
  version: 1

  root:
    level: INFO
    handlers: [consoleHandler]

  handlers:
    consoleHandler:
      class: logging.StreamHandler
      level: INFO
      formatter: sample_formatter
      stream: ext://sys.stderr

  formatters:
    sample_formatter:
      format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

Logs from completed dask-yarn applications can be retrieved using the YARN cli at

yarn logs -applicationId <application-id>

Logs for running dask-yarn applications can be retrieved using client.get_worker_logs(). Note that these logs will only contain logs written to the distributed.worker logger. You cannot write to your own logger and have them appear in the output of client.get_worker_logs(). To write to this logger, get it via

import logging
logger = logging.getLogger("distributed.worker")
logger.info("Writing with the worker logger")

Any logger appropriately configured to log to stdout or stderr will appear in the logs accessed via the yarn CLI, but only the distributed.worker logger output will also be available to get_worker_logs().


Side note

I have tried using the following in ~/.config/dask/distributed.yaml and ~/.config/dask/yarn.yaml

The name of the config files doesn't matter, dask loads all yaml files in all config directories and merges their contents. For more information please read the configuration docs

Upvotes: 2

Related Questions